from __future__ import division from __future__ import unicode_literals from typing import Iterable, Optional import weakref import copy import torch # Partially based on: # https://github.com/tensorflow/tensorflow/blob/r1.13/tensorflow/python/training/moving_averages.py class ExponentialMovingAverage: """ Maintains (exponential) moving average of a set of parameters. Args: parameters: Iterable of `torch.nn.Parameter` (typically from `model.parameters()`). decay: The exponential decay. use_num_updates: Whether to use number of updates when computing averages. """ def __init__( self, parameters: Iterable[torch.nn.Parameter], decay: float, use_num_updates: bool = True ): if decay < 0.0 or decay > 1.0: raise ValueError('Decay must be between 0 and 1') self.decay = decay self.num_updates = 0 if use_num_updates else None parameters = list(parameters) self.shadow_params = [p.clone().detach() for p in parameters if p.requires_grad] self.collected_params = [] # By maintaining only a weakref to each parameter, # we maintain the old GC behaviour of ExponentialMovingAverage: # if the model goes out of scope but the ExponentialMovingAverage # is kept, no references to the model or its parameters will be # maintained, and the model will be cleaned up. self._params_refs = [weakref.ref(p) for p in parameters] def _get_parameters( self, parameters: Optional[Iterable[torch.nn.Parameter]] ) -> Iterable[torch.nn.Parameter]: if parameters is None: parameters = [p() for p in self._params_refs] if any(p is None for p in parameters): raise ValueError( "(One of) the parameters with which this " "ExponentialMovingAverage " "was initialized no longer exists (was garbage collected);" " please either provide `parameters` explicitly or keep " "the model to which they belong from being garbage " "collected." ) return parameters else: return parameters def update( self, parameters: Optional[Iterable[torch.nn.Parameter]] = None ) -> None: """ Update currently maintained parameters. Call this every time the parameters are updated, such as the result of the `optimizer.step()` call. Args: parameters: Iterable of `torch.nn.Parameter`; usually the same set of parameters used to initialize this object. If `None`, the parameters with which this `ExponentialMovingAverage` was initialized will be used. """ parameters = self._get_parameters(parameters) decay = self.decay if self.num_updates is not None: self.num_updates += 1 decay = min( decay, (1 + self.num_updates) / (10 + self.num_updates) ) one_minus_decay = 1.0 - decay with torch.no_grad(): parameters = [p for p in parameters if p.requires_grad] for s_param, param in zip(self.shadow_params, parameters): tmp = (s_param - param) # tmp will be a new tensor so we can do in-place tmp.mul_(one_minus_decay) s_param.sub_(tmp) def copy_to( self, parameters: Optional[Iterable[torch.nn.Parameter]] = None ) -> None: """ Copy current parameters into given collection of parameters. Args: parameters: Iterable of `torch.nn.Parameter`; the parameters to be updated with the stored moving averages. If `None`, the parameters with which this `ExponentialMovingAverage` was initialized will be used. """ parameters = self._get_parameters(parameters) for s_param, param in zip(self.shadow_params, parameters): if param.requires_grad: param.data.copy_(s_param.data) def store( self, parameters: Optional[Iterable[torch.nn.Parameter]] = None ) -> None: """ Save the current parameters for restoring later. Args: parameters: Iterable of `torch.nn.Parameter`; the parameters to be temporarily stored. If `None`, the parameters of with which this `ExponentialMovingAverage` was initialized will be used. """ parameters = self._get_parameters(parameters) self.collected_params = [param.clone() for param in parameters if param.requires_grad] def restore( self, parameters: Optional[Iterable[torch.nn.Parameter]] = None ) -> None: """ Restore the parameters stored with the `store` method. Useful to validate the model with EMA parameters without affecting the original optimization process. Store the parameters before the `copy_to` method. After validation (or model saving), use this to restore the former parameters. Args: parameters: Iterable of `torch.nn.Parameter`; the parameters to be updated with the stored parameters. If `None`, the parameters with which this `ExponentialMovingAverage` was initialized will be used. """ parameters = self._get_parameters(parameters) for c_param, param in zip(self.collected_params, parameters): if param.requires_grad: param.data.copy_(c_param.data) def state_dict(self) -> dict: r"""Returns the state of the ExponentialMovingAverage as a dict.""" # Following PyTorch conventions, references to tensors are returned: # "returns a reference to the state and not its copy!" - # https://pytorch.org/tutorials/beginner/saving_loading_models.html#what-is-a-state-dict return { "decay": self.decay, "num_updates": self.num_updates, "shadow_params": self.shadow_params, "collected_params": self.collected_params } def load_state_dict(self, state_dict: dict) -> None: r"""Loads the ExponentialMovingAverage state. Args: state_dict (dict): EMA state. Should be an object returned from a call to :meth:`state_dict`. """ # deepcopy, to be consistent with module API state_dict = copy.deepcopy(state_dict) self.decay = state_dict["decay"] if self.decay < 0.0 or self.decay > 1.0: raise ValueError('Decay must be between 0 and 1') self.num_updates = state_dict["num_updates"] assert self.num_updates is None or isinstance(self.num_updates, int), \ "Invalid num_updates" # Consistant with torch.optim.Optimizer, cast things to current # device and dtype if len(self.shadow_params) > 0: device = self.shadow_params[0].device dtype = self.shadow_params[0].dtype else: device = None dtype = None self.shadow_params = state_dict["shadow_params"] assert isinstance(self.shadow_params, list), \ "shadow_params must be a list" assert all( isinstance(p, torch.Tensor) for p in self.shadow_params ), "shadow_params must all be Tensors" # Cast shadow params: if device is not None: self.shadow_params = [ p.to(device=device, dtype=dtype) if p.is_floating_point() else p.to(device=device) for p in self.shadow_params ] self.collected_params = state_dict["collected_params"] assert isinstance(self.collected_params, list), \ "collected_params must be a list" assert all( isinstance(p, torch.Tensor) for p in self.collected_params ), "collected_params must all be Tensors" # Cast collected params: if device is not None: self.collected_params = [ p.to(device=device, dtype=dtype) if p.is_floating_point() else p.to(device=device) for p in self.collected_params ]