Probabilistic models give a rich representation of observed data and allow
us to quantify uncertainty, detect outliers, and perform simulations.
Classic probabilistic modeling require us to model our domain with
conditional probabilities, which is not always feasible.
This is particularly true for high-dimensional data such as
images or audio.
In these scenarios, we would like to learn the data
distribution without all the modeling assumptions.
*normaizing flows* is a powerful class of models that allow us to do
just that, without resorting to approximations.
They work by learning a sequence of transformations, or
*flows*, that transforms a simple distribution to one that
fits the observed data. There are several different flavours of normalizing
flows, and in this blog article we are going to implement them using
*affine coupling layers* in PyTorch. To keep things focused, this
article will cover the theory and the model implementation, and in a
follow-up article will see how the model
works in practice by fitting it to some data.

## Normalizing flows

In a normalizing flows model we define an observed stochastic
variable \( x \in \mathbb{R}^D, x \sim p_X, \) a latent stochastic
variable \( z \in \mathbb{R}^D, z \sim p_Z \) and a bijective and differentiable function \( z = f(x): \mathbb{R}^D \mapsto
\mathbb{R}^D \) with inverse \( g = f^{-1} \). The *change of variable
formula* tells us that the (log) densities \(p_X\) and
\(p_Z\) are related through

\begin{equation}
\begin{split}
\log p_X(x) & = \log p_Z(z) & + \log \left \vert \det \frac{dz}{dx} \right \vert \iff \\\

\log p_X(x) & = \log p_Z(f(x)) & + \log \left \vert \det \frac{df(x)}{dx} \right \vert,
\end{split}
\end{equation}

where \( \frac{df(x)}{dx} \) is the Jacobian of \(f\) evaluated at
\(x\). What’s great about this formula is that given \( p_Z \) and \(
f \), we have an *exact* expression for \(p_X\) expressed in \(x\); we do not have to
resort to methods like maximizing ELBO or Monte Carlo sampling, but
can instead train by directly maximizing \(\log p_X(x)\).
We will use this result to design an algorithm that can
learn \( p_X \) from data, which means we have to specify \( p_Z \)
and \( f \).
For our purposes \( p_Z \) can be any distribution we can
evaluate the pdf of and sample from, and it is common to let
\( p_Z = \mathcal{N}\). \(f\) on the other hand, will be the part of
the model we learn from data. However, we can not learn just any \(f\).
This is because computing \( \det \frac{df(x)}{dx} \) is \(
\mathcal{O}(D^3) \) in the general case, so a naive design (i.e. just
plugging a neural network in there) would make
inference intractable for high-dimensional data.
Fortunately, this complexity can be reduced by limiting the interactions between dimensions in
\( f \), but this has the unwanted side-effect of reducing models
expressivity, making it worse at modeling complex distributions. This
trade-off hints of the central problem in normalizing flows
research: How can we design expressive bijections \( f \) with cheap-to-compute Jacobians?
It turns out that this is much easier if we decompose \( f \)
into \(n\) smaller functions or *flows*
\( f = f_n \circ \dots \circ f_2 \circ f_1 \).
The Jacobian for function composition is given by

\begin{equation} \frac{df(x)}{dx} = \prod_{i=1}^n \frac{df_i(x_{i-1})}{dx_{i-1}} \end{equation}

which means its log determinant is

\begin{equation} \log \left \vert \det \frac{df(x)}{dx} \right \vert = \log \left \vert \det \prod_{i=1}^n \frac{df_i(x_{i-1})}{dx_{i-1}} \right \vert = \sum_{i=1}^n \log \left \vert \det \frac{df_i(x_{i-1})}{dx_{i-1}} \right \vert, \end{equation}

where \( x_{i} \) denotes the output of the \(i\):th function and \(x_0 = x\). For \(f\) to be a differentiable bijection, we require every \(f_i\) to be differentiable and bijective, but apart from that we can choose the flows as we please.

Since the Jacobian only depends on the output of the previous flow, we can compute it alongside \(f_i(x)\) with no additional overhead. We will see how to design the flows \(f_i\) shortly, but lets take a top-down view when implementing this and start with a class for the entire normalizing flow.

```
class NormalizingFlow(nn.Module):
def __init__(self, latent: Distribution, flows: List[nn.Module]):
super(NormalizingFlow, self).__init__()
self.latent = latent
self.flows = flows
def latent_log_prob(self, z: torch.Tensor) -> torch.Tensor:
return self.latent.log_prob(z)
def latent_sample(self, num_samples: int = 1) -> torch.Tensor:
return self.latent.sample((num_samples,))
def sample(self, num_samples: int = 1) -> torch.Tensor:
'''Sample a new observation x by sampling z from
the latent distribution and pass through g.'''
return self.g(self.latent_sample(num_samples))
def f(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
'''Maps observation x to latent variable z.
Additionally, computes the log determinant
of the Jacobian for this transformation.
Inveres of g.'''
z, sum_log_abs_det = x, torch.ones(x.size(0)).to(x.device)
for flow in self.flows:
z, log_abs_det = flow.f(z)
sum_log_abs_det += log_abs_det
return z, sum_log_abs_det
def g(self, z: torch.Tensor) -> torch.Tensor:
'''Maps latent variable z to observation x.
Inverse of f.'''
with torch.no_grad():
x = z
for flow in reversed(self.flows):
x = flow.g(x)
return x
def g_steps(self, z: torch.Tensor) -> List[torch.Tensor]:
'''Maps latent variable z to observation x
and stores intermediate results.'''
xs = [z]
for flow in reversed(self.flows):
xs.append(flow.g(xs[-1]))
return xs
def log_prob(self, x: torch.Tensor) -> torch.Tensor:
'''Computes log p(x) using the change of variable formula.'''
z, log_abs_det = self.f(x)
return self.latent_log_prob(z) + log_abs_det
def __len__(self) -> int:
return len(self.flows)
```

You can ignore the convenience methods in this class for now, it is in the functions `f`

that the magic happens. Given an
observation \(x\) we pass it through the composition of flows while
simultaneously computing the log determinant of the Jacobian. With
this implementation, training the model can be done by maximizing
`log_prob`

. `g`

is simply implemented as the reverse flow, and here we
do not need to compute the log determinant of the Jacobian.

And with that we can turn our attention to the individual flows \( f_i
\). One design that has proven successful is *coupling layers*.

## Coupling layers

Coupling layers is a fairly popular approach and designs \( f_i \) by first ordering the data dimensions, and then splitting them into two parts \( x_{\leq d}, x_{>d} = x, \) \(1 < d < D \). \( f_i \) is then given by

\begin{equation}
f_i(x_j) =
\begin{cases}
x_j & \quad j \leq d\\\

\tau_i(x_j; \theta_i(x_{\leq d})) & \quad j > d
\end{cases}
\end{equation}

where \( \tau_i \) is often referred to as the *transformer*, and \( \theta_i \) the
*conditioner*. This is a somewhat unfortunate naming since an
unrelated but *fairly popular* paper also introduced a Transformer, so to
avoid confusion we will refer to \( \tau_i \) as the *coupling
function* instead. We are going to pick a specific function class for
the coupling function, and then learn the conditioner from data.

When splitting the data \(d\) is commonly chosen to be \(d = \frac{D}{2}\), but how the dimensions are ordered varies. If the data has no natural order, we can always impose our own, and if we know the data has some specific structure we could create specific patterns. For instance, a checkers pattern can be used for image data to capture the locality of pixels. At this point you might be wondering why we are mapping half of the dimensions over the identity function. Indeed, we cannot model particularly interesting densities by only transforming half of the dimensions. The reason we are doing this is to limit the interactions between dimensions and avoid the dreaded cubic complexity. Given this formulation the Jacobian is given by the triangular block matrix

\begin{equation*}
\frac{\partial f_i}{\partial x} =
\begin{pmatrix}
I & 0 \\\

\frac{\partial \tau_i}{\partial x_{>d}} & \frac{\partial \tau_i}{\partial x_{>d}}
\end{pmatrix}
\end{equation*}

with log determinant

\[ \log \left \vert \det \frac{df_i(x)}{dx} \right \vert = \log \prod_{j=d}^D \left \vert \frac{d\tau_i(x_j)}{dx_j} \right \vert = \sum_{j=d}^D \log \left \vert \frac{d\tau_i(x_j)}{dx_j} \right \vert \]

which can be computed in \( \mathcal{O}(D) \). To make sure all dimensions can be transformed we switch places of the dimensions between each coupling layer such that they take turns to be transformed.

Since every element in the determinant depends on \( \tau_i \), it is desirable that \( \tau_i \) is fast to evaluate and has a simple derivative, preferably on closed form. While a more expressive \(\tau_i\) makes for a more expressive model overall, we can get away with quite simple transformations. If we need to increase the model’s expressivity we can simply increase the number of flows.

### Affine coupling layers

One choice of coupling function is a simple affine transformation \( \tau_i(x) = \exp(s_i) x + t_i \), where \(s_i\) is exponentiated to make it non-zero and guarantee invertibility. This transformation can be computed efficiently and has a simple derivative \( \frac{d}{dx} \left ( \exp(s_i) x + t_i \right ) = \exp(s_i) \) which gives the pleasant log Jacobian determinant

\[ \log \left \vert \det \frac{df_i(x)}{dx} \right \vert = \sum_{j=d}^D \log \left \vert \frac{d\tau_i(x_j)}{dx_j} \right \vert = \sum_{j=d}^D \log \left \vert \exp(s_i^{(j)}) \right \vert = \sum_{j=d}^D s_i^{(j)} \]

where we use superscript to index elements in \(s_i\). We implement this as follows.

```
class AffineCouplingLayer(nn.Module):
def __init__(
self,
theta: nn.Module,
split: Callable[[torch.Tensor], Tuple[torch.Tensor, torch.Tensor]]
):
super(AffineCouplingLayer, self).__init__()
self.theta = theta
self.split = split
def f(self, x: torch.Tensor) -> torch.Tensor:
'''f : x -> z. The inverse of g.'''
x2, x1 = self.split(x)
t, s = self.theta(x1)
z1, z2 = x1, x2 * torch.exp(s) + t
log_det = s.sum(-1)
return torch.cat((z1, z2), dim=-1), log_det
def g(self, z: torch.Tensor) -> torch.Tensor:
'''g : z -> x. The inverse of f.'''
z1, z2 = self.split(z)
t, s = self.theta(z1)
x1, x2 = z1, (z2 - t) * torch.exp(-s)
return torch.cat((x2, x1), dim=-1)
```

### The conditioner

The final piece of the model is the conditioner. The rest of the model has been carefully designed to adhere to the math, but we obviously need to fit it to observed data. We do this by learning the conditioner function, which in the case of affine coupling layers learns a function that outputs the vectors \(s_i\) and \(t_i\).

Since the Jacobian only depends on \( \tau_i \), we can make \( \theta_i \) as complex as we want without impacting the computational complexity of computing the Jacobian determinant. And what do we do when we want to learn a differentiable complex function? We stick a neural network in there! Depending on what your data looks like you might want to use a specific architecture, but for now we will settle for a simple fully connected network. Learning the parameters for a single flow is a much simpler problem than learning a function for an entire classification or regression problem as is typically done with neural networks. Hence, the conditioner can be significantly simpler and smaller than popular networks such as ResNet or BERT. We will see this in the follow-up article.

```
class Conditioner(nn.Module):
def __init__(
self, in_dim: int, out_dim: int,
num_hidden: int, hidden_dim: int,
num_params: int
):
super(Conditioner, self).__init__()
self.input = nn.Linear(in_dim, hidden_dim)
self.hidden = nn.ModuleList([
nn.Linear(hidden_dim, hidden_dim)
for _ in range(num_hidden)
])
self.num_params = num_params
self.out_dim = out_dim
self.dims = nn.Linear(hidden_dim, out_dim*num_params)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = F.leaky_relu(self.input(x))
for h in self.hidden:
x = F.leaky_relu(h(x))
batch_params = self.dims(x).reshape(x.size(0), self.out_dim, -1)
params = batch_params.chunk(self.num_params, dim=-1)
return [p.squeeze(-1) for p in params]
```

And with that we have specified the entire model! Compared to a network designed for a regression or classification problem there are quite a few design choices involved in a normalizing flows model. Let’s take a moment to recap the steps we went through.

- We want to learn the relationship between \(p(x)\) and \(p(z)\)
- This means learning the bijection \(z = f(x)\)
- \(f\) is decomposed into \( f = f_n \circ f_{n-1} \circ \dots \circ f_1 \)
- \(f_i\) is modeled using coupling layers with coupling function \(\tau_i\) and conditioner \(\theta_i\)
- We let \(\tau_i\) be an affine transformation \(\tau_i(x) = \exp(s_i)x + t_i\)
- We learn the parameters \(\left (s_i, t_i \right ) = \theta_i \) with a neural network

Since everything is differentiable by design and we have an expression for \(p(x)\) we can train this model by maximum likelihood using SGD just like any other architecture. And with that, it is time to wrap up.

## Ending notes

In this article we have covered the core theory of normalizing flows, and seen how to implement a particular flavour of them using affine coupling layers. If you want to dig deeper into normalizing flows for probabilistic modeling I recommend this review paper, which does a good job covering the field. While this article covered techniques that are no longer state-of-the-art, it should give you enough background to dig in to more advanced approaches such as Flow++ or FFJORD.

In the follow-up article we will put this implementation to use and fit it to some actual data.

Thank you for reading, I hope you found it interesting. If you have any comments or questions, don’t hesitate to reach out to me.