from typing import Optional, Tuple, Union
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor
from torch.nn import Parameter
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.nn.dense.linear import Linear
from torch_geometric.typing import Adj, NoneType, OptPairTensor, OptTensor, Size
from torch_geometric.utils import add_self_loops, remove_self_loops, softmax
[docs]class GATConv(MessagePassing):
r"""The graph attentional operator from the `"Graph Attention Networks"
<https://arxiv.org/abs/1710.10903>`_ paper
.. math::
\mathbf{x}^{\prime}_i = \alpha_{i,i}\mathbf{\Theta}\mathbf{x}_{i} +
\sum_{j \in \mathcal{N}(i)} \alpha_{i,j}\mathbf{\Theta}\mathbf{x}_{j},
where the attention coefficients :math:`\alpha_{i,j}` are computed as
.. math::
\alpha_{i,j} =
\frac{
\exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top}
[\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_j]
\right)\right)}
{\sum_{k \in \mathcal{N}(i) \cup \{ i \}}
\exp\left(\mathrm{LeakyReLU}\left(\mathbf{a}^{\top}
[\mathbf{\Theta}\mathbf{x}_i \, \Vert \, \mathbf{\Theta}\mathbf{x}_k]
\right)\right)}.
Args:
in_channels (int or tuple): Size of each input sample, or :obj:`-1` to
derive the size from the first input(s) to the forward method.
A tuple corresponds to the sizes of source and target
dimensionalities.
out_channels (int): Size of each output sample.
heads (int, optional): Number of multi-head-attentions.
(default: :obj:`1`)
concat (bool, optional): If set to :obj:`False`, the multi-head
attentions are averaged instead of concatenated.
(default: :obj:`True`)
negative_slope (float, optional): LeakyReLU angle of the negative
slope. (default: :obj:`0.2`)
dropout (float, optional): Dropout probability of the normalized
attention coefficients which exposes each node to a stochastically
sampled neighborhood during training. (default: :obj:`0`)
add_self_loops (bool, optional): If set to :obj:`False`, will not add
self-loops to the input graph. (default: :obj:`True`)
bias (bool, optional): If set to :obj:`False`, the layer will not learn
an additive bias. (default: :obj:`True`)
**kwargs (optional): Additional arguments of
:class:`torch_geometric.nn.conv.MessagePassing`.
"""
def __init__(
self,
in_channels: Union[int, Tuple[int, int]],
out_channels: int,
heads: int = 1,
concat: bool = True,
negative_slope: float = 0.2,
dropout: float = 0.0,
add_self_loops: bool = True,
bias: bool = True,
**kwargs
):
kwargs.setdefault("aggr", "add")
super(GATConv, self).__init__(node_dim=0, **kwargs)
[docs] self.in_channels = in_channels
[docs] self.out_channels = out_channels
[docs] self.negative_slope = negative_slope
[docs] self.add_self_loops = add_self_loops
# In case we are operating in bipartite graphs, we apply separate
# transformations 'lin_src' and 'lin_dst' to source and target nodes:
# if isinstance(in_channels, int):
# self.lin_src = Linear(in_channels, heads * out_channels,
# bias=False, weight_initializer='glorot')
# self.lin_dst = self.lin_src
# else:
# self.lin_src = Linear(in_channels[0], heads * out_channels, False,
# weight_initializer='glorot')
# self.lin_dst = Linear(in_channels[1], heads * out_channels, False,
# weight_initializer='glorot')
[docs] self.lin_src = nn.Parameter(torch.zeros(size=(in_channels, out_channels)))
nn.init.xavier_normal_(self.lin_src.data, gain=1.414)
[docs] self.lin_dst = self.lin_src
# The learnable parameters to compute attention coefficients:
[docs] self.att_src = Parameter(torch.Tensor(1, heads, out_channels))
[docs] self.att_dst = Parameter(torch.Tensor(1, heads, out_channels))
nn.init.xavier_normal_(self.att_src.data, gain=1.414)
nn.init.xavier_normal_(self.att_dst.data, gain=1.414)
# if bias and concat:
# self.bias = Parameter(torch.Tensor(heads * out_channels))
# elif bias and not concat:
# self.bias = Parameter(torch.Tensor(out_channels))
# else:
# self.register_parameter('bias', None)
self._alpha = None
# self.reset_parameters()
# def reset_parameters(self):
# self.lin_src.reset_parameters()
# self.lin_dst.reset_parameters()
# glorot(self.att_src)
# glorot(self.att_dst)
# # zeros(self.bias)
[docs] def forward(
self,
x: Union[Tensor, OptPairTensor],
edge_index: Adj,
size: Size = None,
return_attention_weights=None,
attention=True,
tied_attention=None,
):
r"""
Args:
return_attention_weights (bool, optional): If set to :obj:`True`,
will additionally return the tuple
:obj:`(edge_index, attention_weights)`, holding the computed
attention weights for each edge. (default: :obj:`None`)
"""
H, C = self.heads, self.out_channels
# We first transform the input node features. If a tuple is passed, we
# transform source and target node features via separate weights:
if isinstance(x, Tensor):
assert x.dim() == 2, "Static graphs not supported in 'GATConv'"
# x_src = x_dst = self.lin_src(x).view(-1, H, C)
x_src = x_dst = torch.mm(x, self.lin_src).view(-1, H, C)
else: # Tuple of source and target node features:
x_src, x_dst = x
assert x_src.dim() == 2, "Static graphs not supported in 'GATConv'"
x_src = self.lin_src(x_src).view(-1, H, C)
if x_dst is not None:
x_dst = self.lin_dst(x_dst).view(-1, H, C)
x = (x_src, x_dst)
if not attention:
return x[0].mean(dim=1)
# return x[0].view(-1, self.heads * self.out_channels)
if tied_attention == None:
# Next, we compute node-level attention coefficients, both for source
# and target nodes (if present):
alpha_src = (x_src * self.att_src).sum(dim=-1)
alpha_dst = None if x_dst is None else (x_dst * self.att_dst).sum(-1)
alpha = (alpha_src, alpha_dst)
self.attentions = alpha
else:
alpha = tied_attention
from torch_sparse import SparseTensor, set_diag
if self.add_self_loops:
if isinstance(edge_index, Tensor):
# We only want to add self-loops for nodes that appear both as
# source and target nodes:
num_nodes = x_src.size(0)
if x_dst is not None:
num_nodes = min(num_nodes, x_dst.size(0))
num_nodes = min(size) if size is not None else num_nodes
edge_index, _ = remove_self_loops(edge_index)
edge_index, _ = add_self_loops(edge_index, num_nodes=num_nodes)
elif isinstance(edge_index, SparseTensor):
edge_index = set_diag(edge_index)
# propagate_type: (x: OptPairTensor, alpha: OptPairTensor)
out = self.propagate(edge_index, x=x, alpha=alpha, size=size)
alpha = self._alpha
assert alpha is not None
self._alpha = None
if self.concat:
out = out.view(-1, self.heads * self.out_channels)
else:
out = out.mean(dim=1)
# if self.bias is not None:
# out += self.bias
from torch_sparse import SparseTensor, set_diag
if isinstance(return_attention_weights, bool):
if isinstance(edge_index, Tensor):
return out, (edge_index, alpha)
elif isinstance(edge_index, SparseTensor):
return out, edge_index.set_value(alpha, layout="coo")
else:
return out
[docs] def message(
self, x_j: Tensor, alpha_j: Tensor, alpha_i: OptTensor, index: Tensor, ptr: OptTensor, size_i: Optional[int]
) -> Tensor:
# Given egel-level attention coefficients for source and target nodes,
# we simply need to sum them up to "emulate" concatenation:
alpha = alpha_j if alpha_i is None else alpha_j + alpha_i
# alpha = F.leaky_relu(alpha, self.negative_slope)
alpha = torch.sigmoid(alpha)
alpha = softmax(alpha, index, ptr, size_i)
self._alpha = alpha # Save for later use.
alpha = F.dropout(alpha, p=self.dropout, training=self.training)
return x_j * alpha.unsqueeze(-1)
[docs] def __repr__(self):
return "{}({}, {}, heads={})".format(self.__class__.__name__, self.in_channels, self.out_channels, self.heads)