Source code for spyrit.misc.statistics

from __future__ import print_function, division
from typing import Any
from pathlib import Path
from PIL import Image

import time
import torch
import torchvision
import numpy as np
from scipy.stats import rankdata

import spyrit.misc.walsh_hadamard as wh
import spyrit.core.torch as spytorch


[docs] def stat_walsh_ImageNet( stat_root=Path("./stats/"), data_root=Path("./data/ILSVRC2012_img_test_v10102019/"), img_size=128, batch_size=256, n_loop=1, device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), ): r""" Args: :attr:`data_root` needs to have all images in a subfolder Example: >>> from pathlib import Path >>> from spyrit.misc.statistics import stat_walsh_ImageNet >>> data_root = Path('../data/ILSVRC2012_v10102019') >>> stat_root = Path('../stat/ILSVRC2012_v10102019') >>> stat_walsh_ImageNet(stat_root = stat_root, data_root = data_root, img_size = 32, batch_size = 1024) """ dataloaders = data_loaders_ImageNet( data_root, img_size=img_size, batch_size=batch_size, seed=7 ) # Walsh ordered transforms time_start = time.perf_counter() stat_walsh(dataloaders["train"], device, stat_root, n_loop) time_elapsed = time.perf_counter() - time_start print(f"Computed in {time_elapsed} seconds")
[docs] def stat_walsh_stl10( stat_root=Path("./stats/"), data_root=Path("./data/"), img_size=64, batch_size=1024, device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), ): """ Args: :attr:`data_root` is expected to contain an 'stl10_binary' subfolder with the test*.bin, train*.bin and unlabeled_X.bin files. Example: >>> data_root = Path('../datasets/') >>> stat_root = Path('../stat/stl10') >>> from spyrit.misc.statistics import stat_walsh_stl10 >>> stat_walsh_stl10(stat_root = stat_root, data_root = data_root) """ dataloaders = data_loaders_stl10( data_root, img_size=img_size, batch_size=batch_size, seed=7 ) # Walsh ordered transforms time_start = time.perf_counter() stat_walsh(dataloaders["train"], device, stat_root) time_elapsed = time.perf_counter() - time_start print(f"Computed in {time_elapsed} seconds")
[docs] def data_loaders_ImageNet( train_root, val_root=None, img_size=64, batch_size=512, seed=7, shuffle=False ): """ Args: Both 'train_root' and 'val_root' need to have images in a subfolder shuffle=True to shuffle train set only (test set not shuffled) The output of torchvision datasets are PILImage images in the range [0, 1]. We transform them to Tensors in the range [-1, 1]. Also RGB images are converted into grayscale images. """ torch.manual_seed(seed) # reproductibility of random crop # transform = torchvision.transforms.Compose( [ torchvision.transforms.functional.to_grayscale, torchvision.transforms.RandomCrop( size=(img_size, img_size), pad_if_needed=True, padding_mode="edge" ), torchvision.transforms.ToTensor(), torchvision.transforms.Normalize([0.5], [0.5]), ] ) # train set trainset = torchvision.datasets.ImageFolder(root=train_root, transform=transform) trainloader = torch.utils.data.DataLoader( trainset, batch_size=batch_size, shuffle=shuffle ) # validation set (if any) if val_root is not None: valset = torchvision.datasets.ImageFolder(root=val_root, transform=transform) valloader = torch.utils.data.DataLoader( valset, batch_size=batch_size, shuffle=False ) else: valloader = None dataloaders = {"train": trainloader, "val": valloader} return dataloaders
[docs] class CenterCrop: """ Args: img_size=int, image size Center crop if image not square in order to ensure that all images have same size """ def __init__(self, img_size): self.img_size = img_size self.centerCrop = torchvision.transforms.CenterCrop(img_size) def __call__(self, inputs, *args: Any, **kwds: Any): # Center crop if not square img_shape = inputs.size if img_shape[0] != img_shape[1]: return self.centerCrop(inputs) else: return inputs
[docs] def transform_gray_norm(img_size): """ Args: img_size=int, image size Create torchvision transform for natural images (stl10, imagenet): convert them to grayscale, then to tensor, and normalize between [-1, 1] """ transform = torchvision.transforms.Compose( [ torchvision.transforms.functional.to_grayscale, torchvision.transforms.Resize(img_size), # torchvision.transforms.CenterCrop(img_size), CenterCrop(img_size), torchvision.transforms.ToTensor(), torchvision.transforms.Normalize([0.5], [0.5]), ] ) return transform
[docs] def data_loaders_stl10( data_root, img_size=64, batch_size=512, seed=7, shuffle=False, download=True ): """ Args: shuffle=True to shuffle train set only (test set not shuffled) The output of torchvision datasets are PILImage images in the range [0, 1]. We transform them to Tensors in the range [-1, 1]. Also RGB images are converted into grayscale images. """ transform = transform_gray_norm(img_size) trainset = torchvision.datasets.STL10( root=data_root, split="train+unlabeled", download=download, transform=transform ) trainloader = torch.utils.data.DataLoader( trainset, batch_size=batch_size, shuffle=shuffle ) testset = torchvision.datasets.STL10( root=data_root, split="test", download=download, transform=transform ) testloader = torch.utils.data.DataLoader( testset, batch_size=batch_size, shuffle=False ) dataloaders = {"train": trainloader, "val": testloader} return dataloaders
[docs] def stat_walsh(dataloader, device, root, n_loop=1): """ nloop > 1 is relevant for dataloaders with random crops such as that provided by data_loaders_ImageNet """ # Get dimensions and estimate total number of images in the dataset inputs, _ = next(iter(dataloader)) (_, _, nx, ny) = inputs.shape # -------------------------------------------------------------------------- # 1. Mean # -------------------------------------------------------------------------- mean = mean_walsh(dataloader, device, n_loop=n_loop) # Save if n_loop == 1: path = root / Path("Average_{}x{}".format(nx, ny) + ".npy") else: path = root / Path("Average_{}_{}x{}".format(n_loop, nx, ny) + ".npy") if not root.exists(): root.mkdir() np.save(path, mean.cpu().detach().numpy()) # -------------------------------------------------------------------------- # 2. Covariance # ------------------------------------------------------------------------- cov = cov_walsh(dataloader, mean, device, n_loop=n_loop) # Save if n_loop == 1: path = root / Path("Cov_{}x{}".format(nx, ny) + ".npy") else: path = root / Path("Cov_{}_{}x{}".format(n_loop, nx, ny) + ".npy") if not root.exists(): root.mkdir() np.save(path, cov.cpu().detach().numpy()) return mean, cov
[docs] def mean_walsh(dataloader, device, n_loop=1): """ nloop > 1 is relevant for dataloaders with random crops such as that provided by data_loaders_ImageNet """ # Get dimensions and estimate total number of images in the dataset inputs, _ = next(iter(dataloader)) (b, c, nx, ny) = inputs.shape tot_num = len(dataloader) * b # Init n = 0 # H = wh.walsh_matrix(nx).astype(np.float32, copy=False) mean = torch.zeros((nx, ny), dtype=torch.float32) # Send to device (e.g., cuda) mean = mean.to(device) # H = torch.from_numpy(H).to(device) # Compute Mean # Accumulate sum over all images in dataset for i in range(n_loop): torch.manual_seed(i) for inputs, _ in dataloader: inputs = inputs.to(device) trans = spytorch.fwht_2d(inputs, True) mean = mean.add(torch.sum(trans, 0)) # print n = n + inputs.shape[0] print(f"Mean: {n} / (less than) {tot_num*n_loop} images", end="\n") # test # print(f' | {inputs[53,0,33,49]}', end='\n') print("", end="\n") # Normalize mean = mean / n mean = torch.squeeze(mean) return mean
[docs] def cov_walsh(dataloader, mean, device, n_loop=1): """ nloop > 1 is relevant for dataloaders with random crops such as that provided by data_loaders_ImageNet """ # Get dimensions and estimate total number of images in the dataset inputs, _ = next(iter(dataloader)) (b, c, nx, ny) = inputs.shape tot_num = len(dataloader) * b # H = wh.walsh_matrix(nx).astype(np.float32, copy=False) # H = torch.from_numpy(H).to(device) # Covariance -------------------------------------------------------------- # Init n = 0 cov = torch.zeros((nx * ny, nx * ny), dtype=torch.float32) cov = cov.to(device) # Accumulate (im - mu)*(im - mu)^T over all images in dataset for i in range(n_loop): torch.manual_seed(i) for inputs, _ in dataloader: inputs = inputs.to(device) trans = spytorch.fwht_2d(inputs, True) trans = trans - mean.repeat(inputs.shape[0], 1, 1, 1) trans = trans.reshape(inputs.shape[0], nx * ny, 1) cov = torch.addbmm(cov, trans, trans.reshape(inputs.shape[0], 1, nx * ny)) # print n += inputs.shape[0] print(f"Cov: {n} / (less than) {tot_num*n_loop} images", end="\n") # test # print(f' | {inputs[53,0,33,49]}', end='\n') print("", end="\n") # Normalize cov = cov / (n - 1) return cov
[docs] def stat_fwalsh_S(dataloader, device, root): # NOT validated! # Get dimensions and estimate total number of images in the dataset inputs, classes = next(iter(dataloader)) (b, c, nx, ny) = inputs.shape tot_num = len(dataloader) * b # 1. Mean # Init n = 0 mean = torch.zeros((nx, ny), dtype=torch.float32) # Send to device (e.g., cuda) mean = mean.to(device) ind = wh.sequency_perm_ind(nx * ny) # Accumulate sum over all images in dataset for inputs, _ in dataloader: inputs = inputs.to(device) trans = wh.fwalsh2_S_torch(inputs, ind) mean = mean.add(torch.sum(trans, 0)) # print n = n + inputs.shape[0] print(f"Mean: {n} / (less than) {tot_num} images", end="\n") print("", end="\n") # Normalize mean = mean / n mean = torch.squeeze(mean) # torch.save(mean, root+'Average_{}x{}'.format(nx,ny)+'.pth') path = root / Path("Average_{}x{}".format(nx, ny) + ".npy") if not root.exists(): root.mkdir() np.save(path, mean.cpu().detach().numpy()) # 2. Covariance # Init n = 0 cov = torch.zeros((nx * ny, nx * ny), dtype=torch.float32) cov = cov.to(device) # Accumulate (im - mu)*(im - mu)^T over all images in dataset for inputs, _ in dataloader: inputs = inputs.to(device) trans = wh.fwalsh2_S_torch(inputs, ind) trans = trans - mean.repeat(inputs.shape[0], 1, 1, 1) trans = trans.reshape(inputs.shape[0], nx * ny, 1) cov = torch.addbmm(cov, trans, trans.reshape(inputs.shape[0], 1, nx * ny)) # print n += inputs.shape[0] print(f"Cov: {n} / (less than) {tot_num} images", end="\n") print("", end="\n") # Normalize cov = cov / (n - 1) # torch.save(cov, root+'Cov_{}x{}'.format(nx,ny)+'.pth') # todo? path = root / Path("Cov_{}x{}".format(nx, ny) + ".npy") if not root.exists(): root.mkdir() np.save(path, cov.cpu().detach().numpy()) return mean, cov
[docs] def Cov2Var(Cov: np.ndarray, out_shape=None): r""" Extracts Variance Matrix from Covariance Matrix. The Variance matrix is extracted from the diagonal of the Covariance matrix. Args: Cov (np.array): Covariance matrix of shape :math:`(N_x, N_x)`. out_shape (tuple, optional): Shape of the output variance matrix. If `None`, :math:`N_x` must be a perfect square and the output is a square matrix whose shape is :math:`(\sqrt{N_x}, \sqrt{N_x})`. Default is `None`. Raises: ValueError: If the input matrix is not square. ValueError: If the output shape is not valid. Returns: np.array: Variance matrix of shape :math:`(\sqrt{N_x}, \sqrt{N_x})` or :math:`out_shape` if provided. """ row, col = Cov.shape # check Cov is square if row != col: raise ValueError("Covariance matrix must be a square matrix") if out_shape is None: out_shape = (int(np.sqrt(row)), int(np.sqrt(col))) if out_shape[0] * out_shape[1] != row: raise ValueError( f"Invalid output shape, got {out_shape} with " + f"{out_shape[0]}*{out_shape[1]} != {row}" ) # copy is necessary (see np documentation about diagonal) return np.diagonal(Cov).copy().reshape(out_shape)
[docs] def img2mask(Ord, M): """ Returns subsampling mask from order matrix """ (nx, ny) = Ord.shape msk = np.ones((nx, ny)) ranked_data = np.reshape(rankdata(-Ord, method="ordinal"), (nx, ny)) msk[np.absolute(ranked_data) > M] = 0 return msk
# todo: rewrite in a fashion similar to stat_walsh_stl10
[docs] def stat_fwalsh_S_stl10( stat_root=Path("./stats/"), data_root=Path("./data/"), img_size=64, batch_size=1024 ): """Fast Walsh S-transform of X in "2D" Args: :attr:`X` (torch.tensor): input image with shape `(*, n, n)`. `n`**2 should be a power of two. Returns: torch.tensor: S-transformed signal with shape `(*, n, n)` Examples: >>> import spyrit.misc.statistics as st >>> st.stat_fwalsh_S_stl10() """ device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") torch.manual_seed(7) # for reproductibility transform = torchvision.transforms.Compose( [ torchvision.transforms.functional.to_grayscale, torchvision.transforms.Resize((img_size, img_size)), torchvision.transforms.ToTensor(), torchvision.transforms.Normalize([0.5], [0.5]), ] ) trainset = torchvision.datasets.STL10( root=data_root, split="train+unlabeled", download=True, transform=transform ) trainloader = torch.utils.data.DataLoader( trainset, batch_size=batch_size, shuffle=True ) testset = torchvision.datasets.STL10( root=data_root, split="test", download=True, transform=transform ) testloader = torch.utils.data.DataLoader( testset, batch_size=batch_size, shuffle=False ) dataloaders = {"train": trainloader, "val": testloader} # Walsh ordered transforms time_start = time.perf_counter() stat_fwalsh_S(dataloaders["train"], device, stat_root) time_elapsed = time.perf_counter() - time_start print(time_elapsed)
# %% delete ? Deprecated ? # def stat_walsh_np(dataloader, root): # """ # Computes Mean Hadamard Image over the whole dataset + # Covariance Matrix Amongst the coefficients # """ # inputs, classes = next(iter(dataloader)) # inputs = inputs.cpu().detach().numpy() # (batch_size, channels, nx, ny) = inputs.shape # tot_num = len(dataloader)*batch_size # H1d = wh.walsh_ordered(nx) # # Abs matrix # Mean_had = abs_walsh_ordered(dataloader, H1d, tot_num) # print("Saving abs") # np.save(root / Path('Abs_{}x{}'.format(nx,ny)+'.npy'), Mean_had) # # Mean matrix # #-- Accumulate over all images in dataset # n = 0 # Mean_had = np.zeros((nx, ny)) # for inputs,_ in dataloader: # inputs = inputs.cpu().detach().numpy() # for i in range(inputs.shape[0]): # img = inputs[i,0,:,:] # h_img = wh.walsh_ordered2(img,H1d) # Mean_had += h_img # n = n+1 # print(f'Mean: {n} / (less than) {tot_num} images', end='\r') # print('', end='\n') # #-- Normalize & save # Mean_had = Mean_had/n; # print("Saving mean") # np.save(root / Path('Mean_{}x{}'.format(nx,ny)+'.npy'), Mean_had) # # Covariance matrix # n = 0 # Cov_had = np.zeros((nx*ny, nx*ny)); # for inputs,_ in dataloader: # inputs = inputs.cpu().detach().numpy(); # for i in range(inputs.shape[0]): # img = inputs[i,0,:,:]; # h_img = wh.walsh_ordered2(img, H1d); # Norm_Variable = np.reshape(h_img-Mean_had, (nx*ny,1)); # Cov_had += Norm_Variable*np.transpose(Norm_Variable); # n = n+1 # print(f'Covariance: {n} / (less than) {tot_num} images', end='\r') # print() # #-- Normalize & save # Cov_had = Cov_had/(n-1); # np.save(root / Path('Cov_{}x{}'.format(nx,ny)+'.npy'), Cov_had) # %% delete ? Deprecated ? # def abs_walsh(dataloader, device): # # Estimate tot_num # inputs, classes = next(iter(dataloader)) # #inputs = inputs.cpu().detach().numpy(); # (batch_size, channels, nx, ny) = inputs.shape; # tot_num = len(dataloader)*batch_size; # # Init # n = 0 # output = torch.zeros((nx,ny),dtype=torch.float32) # H = wh.walsh_matrix(nx).astype(np.float32, copy=False) # # Send to device (e.g., cuda) # output = output.to(device) # H = torch.from_numpy(H).to(device) # # Accumulate over all images in dataset # for inputs,_ in dataloader: # inputs = inputs.to(device); # n = n + inputs.shape[0] # trans = wh.walsh2_torch(inputs,H); # trans = torch.abs(trans) # output = output.add(torch.sum(trans,0)) # print(f'Abs: {n} / (less than) {tot_num} images', end='\n') # print('', end='\n') # #-- Normalize # output = output/n; # output = torch.squeeze(output) # return output # %% delete ? Deprecated ? # def Stat_had(dataloader, root): # """ # Computes Mean Hadamard Image over the whole dataset + # Covariance Matrix Amongst the coefficients # """ # inputs, classes = next(iter(dataloader)) # inputs = inputs.cpu().detach().numpy(); # (batch_size, channels, nx, ny) = inputs.shape; # tot_num = len(dataloader)*batch_size; # Mean_had = np.zeros((nx, ny)); # for inputs,labels in dataloader: # inputs = inputs.cpu().detach().numpy(); # for i in range(inputs.shape[0]): # img = inputs[i,0,:,:]; # H = wh.walsh_matrix(len(img)) # h_img = wh.walsh2(img,H)/len(img) # Mean_had += h_img; # Mean_had = Mean_had/tot_num; # Cov_had = np.zeros((nx*ny, nx*ny)); # for inputs,labels in dataloader: # inputs = inputs.cpu().detach().numpy(); # for i in range(inputs.shape[0]): # img = inputs[i,0,:,:]; # H = wh.walsh_matrix(len(img)) # h_img = wh.walsh2(img,H)/len(img) # Norm_Variable = np.reshape(h_img-Mean_had, (nx*ny,1)); # Cov_had += Norm_Variable*np.transpose(Norm_Variable); # Cov_had = Cov_had/(tot_num-1); # np.save(root+'Cov_{}x{}'.format(nx,ny)+'.npy', Cov_had) # np.savetxt(root+'Cov_{}x{}'.format(nx,ny)+'.txt', Cov_had) # np.save(root+'Average_{}x{}'.format(nx,ny)+'.npy', Mean_had) # np.savetxt(root+'Average_{}x{}'.format(nx,ny)+'.txt', Mean_had) # cv2.imwrite(root+'Average_{}x{}'.format(nx,ny)+'.png', Mean_had) #Needs conversion to Uint8! # return Mean_had, Cov_had # %% Keep or delete?
[docs] def optim_had(dataloader, root): """Computes image that ranks the hadamard coefficients""" inputs, classes = next(iter(dataloader)) inputs = inputs.cpu().detach().numpy() (batch_size, channels, nx, ny) = inputs.shape tot_num = len(dataloader) * batch_size Cumulated_had = np.zeros((nx, ny)) # Iterate over data. for inputs, labels in dataloader: inputs = inputs.cpu().detach().numpy() for i in range(inputs.shape[0]): img = inputs[i, 0, :, :] H = wh.walsh_matrix(len(img)) h_img = wh.walsh2(img, H) / len(img) h_img = np.abs(h_img) / tot_num Cumulated_had += h_img Cumulated_had = Cumulated_had / np.max(Cumulated_had) * 255 np.save(root + "{}x{}".format(nx, ny) + ".npy", Cumulated_had) np.savetxt(root + "{}x{}".format(nx, ny) + ".txt", Cumulated_had) im = Image.fromarray(Cumulated_had) im.save(root + "{}x{}".format(nx, ny) + ".png") return Cumulated_had
# %% What for? Still in use? Ask Antonio?
[docs] def stat_mean_coef_from_model(dataloader, device, model_exp): # A rediscuter avec Nicolas # Get dimensions and estimate total number of images in the dataset inputs, classes = next(iter(dataloader)) (ny, nh) = mdt.size() mean = torch.zeros(nh).to(device) # for inputs, _ in dataloader: inputs = inputs.to(device) trans = torch.matmul(inputs, model_exp) # .cpu() mean = mean.add(torch.sum(trans, [0, 1])) mean_vect = np.abs(np.transpose(mean.cpu().detach().numpy())) # mean = mean/mean.max() return mean_vect
[docs] def mea_abs_model(dataloader, device, model, root): # Get dimensions and estimate total number of images in the dataset inputs, classes = next(iter(dataloader)) (nx, nh) = model.size() tot_num = len(dataloader) (b, ny, nx) = inputs.size() # 1. Mean # Init n = 0 mean = torch.zeros(nh).to(device) # Accumulate sum over all images in dataset for inputs, _ in dataloader: inputs = inputs.to(device) trans = torch.abs(torch.matmul(inputs, model)) # .cpu() mean = mean.add(torch.sum(trans, [0, 1])) # print n += inputs.shape[0] # print(f'Mean: {n} / (less than) {tot_num} images', end='\n') print("", end="\n") # Normalize mean = mean / (n * ny) # print(mean.size()) mean = torch.squeeze(mean) # torch.save(mean, root+'Average_{}x{}'.format(nx,ny)+'.pth') path = root / Path("W_Average_abs_Nx{}_Nh{}".format(nx, nh) + ".npy") # if not root.exists(): # root.mkdir() np.save(path, mean.cpu().detach().numpy()) return mean
[docs] def stat_model(dataloader, device, model, root): # Get dimensions and estimate total number of images in the dataset inputs, classes = next(iter(dataloader)) (nx, nh) = model.size() tot_num = len(dataloader) (b, ny, nx) = inputs.size() # 1. Mean # Init n = 0 mean = torch.zeros(nh).to(device) # Accumulate sum over all images in dataset for inputs, _ in dataloader: inputs = inputs.to(device) trans = torch.matmul(inputs, model) # .cpu() mean = mean.add(torch.sum(trans, [0, 1])) # print n += inputs.shape[0] # print(f'Mean: {n} / (less than) {tot_num} images', end='\n') print("", end="\n") # Normalize mean = mean / (n * ny) # print(mean.size()) mean = torch.squeeze(mean) # torch.save(mean, root+'Average_{}x{}'.format(nx,ny)+'.pth') path = root / Path("W_Average_Nx{}_Nh{}".format(nx, nh) + ".npy") # if not root.exists(): # root.mkdir() np.save(path, mean.cpu().detach().numpy()) # 2. Covariance # Init n = 0 cov = torch.zeros((nh, nh), dtype=torch.float32) cov = cov.to(device) # Accumulate (im - mu)*(im - mu)^T over all images in dataset for inputs, _ in dataloader: inputs = inputs.to(device) trans = torch.matmul(inputs, model) # print(trans.size()) for i in range(ny): im_mu = trans[0, i] - mean cov += torch.matmul(im_mu.reshape(nh, 1), im_mu.reshape(1, nh)) # print n += inputs.shape[0] # print(f'Cov: {n} / (less than) {tot_num} images', end='\n') print("", end="\n") # Normalize cov = cov / ((n * ny) - 1) # torch.save(cov, root+'Cov_{}x{}'.format(nx,ny)+'.pth') # todo? path = root / Path("W_Cov_Nx{}_Nh{}".format(nx, nh) + ".npy") # if not root.exists(): # root.mkdir() np.save(path, cov.cpu().detach().numpy()) return mean, cov