数据层
数据层的一个主要目标是提供GraphLearn图操作返回的numpy数据流到TensorFlow1.x里的tensor数据流的转换。我们使用了tf.data.Dataset
的from_generator
接口来完成这样转换过程。此外,为了方便对节点、边特征的统一处理,我们提供了特征处理接口来完成对原始连续特征、离散特征、多值离散特征的处理,将这些特征处理成连续的向量,作为后续模型的输入。下面我们按照数据处理的先后顺序描述整个数据层的构建过程:
基础数据对象,描述模型层的基本数据对象。Tensor数据对象,描述TensorFlow tensor格式的数据格式。
特征处理,描述对不同类型原始特征的处理的接口。
基础数据对象
对应nn/data.py, nn/subgraph.py, nn/dataset.py
GraphLearn每个图操作的返回结果为numpy ndarray格式的Nodes
或者Edges
对象,为了方便模型层的处理,我们使用Data
对象来统一表示Nodes
和Edges
。这样一个GSL的查询结果,可以用一个Data
dict来表示,key是alias, value是具体的Data
对象。
Data
class Data(object):
"""A plain object modeling a batch of `Nodes` or `Edges`."""
def __init__(self,
ids=None,
ints=None,
floats=None,
strings=None,
labels=None,
weights=None,
**kwargs):
""" ints, floats and strings are attributes in numpy or Tensor format
with the shape
[batch_size, int_attr_num],
[batch_size, float_attr_num],
[batch_size, string_attr_num].
labels and weights are in numpy or Tensor format with the shape
[batch_size], [batch_size].
The data object can be extented by any other additional data.
"""
self.ids = ids
self.int_attrs = ints
self.float_attrs = floats
self.string_attrs = strings
self.labels = labels
self.weights = weights
for key, item in kwargs.items():
self[key] = item
self._handler_dict = {}
def apply(self, func):
"""Applies the function `func` to all attributes.
"""
for k, v in self.__dict__.items():
if v is not None and k[:2] != '__' and k[-2:] != '__':
self.__dict__[k] = func(v)
return self
SubGraph
为了方便GNNs算法的建模,我们使用SubGraph
来表示一个采样的子图,它由edge_index
, nodes
和edges
构成。由于目前GSL没有提供直接采样子图的功能,因此生成SubGraph时需要显示地使用一个inducer
。
class SubGraph(object):
""" `SubGraph` is a basic data structure used to describe a sampled
subgraph. It constists of `edge_index` and nodes `Data` and edges `Data`.
Args:
edge_index: A np.ndarray object with shape [2, batch_size], which indicates
[rows, cols] of SubGraph.
nodes: A `Data` object denoting the input nodes.
edges: A `Data` object denoting the input edges.
Note that this object can be extented by any other additional data.
"""
def __init__(self, edge_index, nodes, edges=None, **kwargs):
self._edge_index = edge_index
self._nodes = nodes
self._edges = edges
for key, item in kwargs.items():
self[key] = item
@property
def num_nodes(self):
return self._nodes.ids.size
@property
def num_edges(self):
return self._edge_index.shape[1]
@property
def nodes(self):
return self._nodes
@property
def edge_index(self):
return self._edge_index
@property
def edges(self):
return self._edges
def __getitem__(self, key):
return getattr(self, key, None)
def __setitem__(self, key, value):
setattr(self, key, value)
HeteroSubGraph(since v1.1.0)
class HeteroSubGraph(object):
""" A data object describing heterogeneous `SubGraph`.
Different types of nodes and edges are represented by a dict.
Args:
edge_index_dict: A dict of np.ndarray objects. Each key is a tuple of
(src_type, edge_type, dst_type) and each value indicates [rows, cols].
nodes_dict: A dict of `Data`/ndarray object denoting different types of nodes.
edges_dict: A dict of `Data`/ndarray object denoting different types of edges.
Examples:
For meta-path "user-click-item", the HeteroSubGraph may be created as follows:
edge_index_dict[('user', 'click', 'item')] = np.array([[0,1,2], [2,3,4]])
edges_dict[('user', 'click', 'item')] = Data(...)
nodes_dict['user'] = Data(...)
nodes_dict['item'] = Data(...)
hg = HeteroSubGraph(edge_index_dict, nodes_dict, edges_dict)
"""
def __init__(self, edge_index_dict, nodes_dict, edges_dict=None, **kwargs):
self._edge_index_dict = edge_index_dict
self._nodes_dict = nodes_dict
self._edges_dict = edges_dict
for key, item in kwargs.items():
self[key] = item
def num_nodes(self, node_type):
if isinstance(self._nodes_dict[node_type], Data):
return self._nodes_dict[node_type].ids.size
else:
return self._nodes_dict[node_type].size
def num_edges(self, edge_type):
return self._edge_index_dict[edge_type].shape[1]
@property
def nodes_dict(self):
return self._nodes_dict
@property
def edge_index_dict(self):
return self._edge_index_dict
@property
def edges_dict(self):
return self._edges_dict
@property
def keys(self):
r"""Returns all names of graph attributes."""
keys = [key for key in self.__dict__.keys() if self[key] is not None]
keys = [key for key in keys if key[:2] != '__' and key[-2:] != '__']
return keys
@property
def node_types(self):
"""Returns all node types of the heterogeneous subgraph."""
return list(self._nodes_dict.keys())
@property
def edge_types(self):
"""Returns all edge types of the heterogeneous subgraph."""
return list(self._edge_index_dict.keys())
def __getitem__(self, key):
return getattr(self, key, None)
def __setitem__(self, key, value):
setattr(self, key, value)
Tensor数据对象
对应nn/tf/data/
我们将一个GSL的结果首先组织成Data
dict或者使用induce_func组成成SubGraph
s, 这时候数据还是numpy格式。接着我们使用tf.data.Dataset
的from_generator
函数来完成numpy到tensor的转换。我们内置了一个Dataset
对象完成这样的转换。
Dataset
Dataset
使用from_generator
方法,完成numpy到tensor格式的转换,对外提供一个initializable的iterator
,以及get_data_dict
, get_egograph
, get_batchgraph
的接口。
在使用SubGraph/HeterSubGraph格式时需要提供一个inducer,完成GSL采样结果到SubGraph/HeterSubGraph的转换,具体参考examples/tf/sage, examples/tf/seal, examples/tf/bipartite_sage里的inducer例子。
class Dataset(object):
"""`Dataset` object is used to convert GSL query results to Tensor format
`Data`. It provides methods to get raw `Data` dict, `EgoGraph`, `SubGraph`
and `HeteroSubGraph`.
Args:
query: GSL query.
window: dataset capacity.
inducer: A `SubGraphInducer` instance to generate SubGraph/HeteroSubGraph.
"""
def __init__(self, query, window=10, inducer=None, **kwargs):
@property
def iterator(self):
return self._iterator
def get_data_dict(self):
"""get a dict of tensor format `Data` corresponding the given query.
Keys of the dict is the aliaes in query.
"""
return self._rds.build_data_dict(list(self._values))
def get_egograph(self, source, neighbors=None):
""" Origanizes the data dict as EgoGraphs and then check and return
the specified `EgoGraph`.
Args:
source(str): alias of centric vertices.
neighbors(list of str): alias of neighbors at each hop.
Default `None`: automatically generating the positive neighbors for
centric vertices. It requires that each hop only has one postive
downstream in GSL.
Given list of string: the alias of each hop in GSL. The list must
follow the order of traverse in GSL, and each one should be the postive
or negative downstream for the front.
"""
def get_batchgraph(self):
"""get `BatchGraph`/`HeteroBatchGraph`s.
"""
...
return pos_graph, neg_graph
get_data_dict
: 将任意一个GSL的结果,转换成一个Data
dict的形式。dict的key是GSL里的alias, value是对应的一个GSL节点的结果的tensor格式的Data
。get_egograph
:将包含fix-sized的采样的GSL的结果,转换成EgoGraph
的格式。 可以通过alias来获取某个ego的EgoGraph
子图get_batchgraph
: 将包含full neighbor sampler的GSL的结果,转换成BatchGraph
的格式,目前该方法还是experimental的,只支持从边遍历的一跳full neighbor采样,而且对GSL里的alias也有严格要求,必须是SubKeys里的一种。
下面我们具体介绍这三种格式的数据。
Data dict
任意的GSL都可以转换成Data
dict的形式,dict的key是GSL里的alias, value是对应的一个GSL节点的结果的tensor格式的Data
。
EgoGraph
EgoGraph
表示一个由中心节点和其k-hop邻居构成的子图。提供了src(), hop_node(), hop_edge()等接口来获取中心节点,某跳的邻居点和某跳的邻居边。
class EgoGraph(object):
""" `EgoGraph` is a basic data structure used to describe a sampled graph.
It constists of src `Data` and src's neighbors(nodes and edges) `Data`.
The `EgoGraph` is mainly used to represent subgraphs generated by fixed-size
neighbor sampling, in which the data can be efficiently organized in dense
format and the model can be computed using the dense operators.
Args:
src: A `Data`/Tensor object used to describe the centric nodes.
nbr_nodes: A list of `Data`/Tensor instance to describe neighborhood nodes.
node_schema: A list of tuple to describe the FeatureSpec of src and
neighbor nodes. Each tuple is formatted with (name, spec), in which `name`
is node's type, and `spec` is a FeatureSpec object. Be sure that
`len(node_schema) == len(neighbors) + 1`.
nbr_nums: A list of number of neighbor nodes per hop.
nbr_edges: A list of `Data`/Tensor instance to describe neighborhood edges.
edge_schema: A list of tuple to describe the `FeatureSpec` of neighbor edges.
"""
def __init__(self,
src,
nbr_nodes,
node_schema,
nbr_nums,
nbr_edges=[],
edge_schema=[],
**kwargs):
self._src = src
self._nbr_nodes = nbr_nodes
self._node_schema = node_schema
self._nbr_edges = nbr_edges
self._edge_schema = edge_schema
if self._node_schema is not None:
assert len(self._node_schema) == len(nbr_nums) + 1
self._nbr_nums = np.array(nbr_nums)
@property
def src(self):
return self._src
@property
def node_schema(self):
return self._node_schema
@property
def nbr_nodes(self):
return self._nbr_nodes
@property
def nbr_nums(self):
return self._nbr_nums
@property
def edge_schema(self):
return self._edge_schema
@property
def nbr_edges(self):
return self._nbr_edges
def hop_node(self, i):
""" Get the hop ith neighbors nodes of centric src, where i starts
from zero. The return value is a tensor with shape
[batch_size * k_1 *...* k_i, dim], where k_i is the expand neighbor
count at hop i and dim is the sum of all feature dimensions, which
may be different due to kinds of vertex types.
"""
return self._nbr_nodes[i]
def hop_edge(self, i):
if len(self._nbr_edges) == 0:
raise ValueError("No edge data.")
return self._nbr_edges[i]
BatchGraph
为了高效批量训练,我们将一个batch的SubGraph
s合并成BatchGraph
的格式,BatchGraph
继承自SubGraph
, 提供num_graphs
获取SubGraphs个数的接口,同时提供了graph_node_offsets
和 graph_edge_offsets
接口来表示合并后的每个SubGraph
里的点和边的offset。
class BatchGraph(SubGraph):
"""A BatchGraph object, which represents a batch of `SubGraph`s.
Nodes, edges in subgraphs are concatenated together and their offsets
are recorded with `graph_node_offsets` and `graph_edge_offsets`. The
`edge_index` of subgraph is remapped according to the order offset of
each subgrpah and then form as a new `edge_index`.
Args:
edge_index: concatenated edge_index of `SubGraph`s.
nodes: A `Data`/Tensor object denoting concatenated nodes of `SubGraph`s
with shape [batch_size, attr_num].
node_schema: A (name, Decoder) tuple used to describe
the nodes' feature or a list of such tuple to describe src and dst nodes'
feature for heterogeneous graph.
graph_node_offsets: indicates the nodes offset of each `SubGraph`.
edges: A `Data`/Tensor object denoting concatenated edges of `SubGraph`s.
node_schema: A (name, Decoder) tuple used to describe the edges' feature.
graph_edge_offsets: indicates the edges offset of each `SuGraph`.
additional_keys: A list of keys used to indicate the additional data. Note
that these keys must not contain the above args.
Note that we require this argument in order to keep the correct order of
the additional data when generating Tensor format of `BatchGraph`.
"""
def __init__(self, edge_index, nodes, node_schema, graph_node_offsets,
edges=None, edge_schema=None, graph_edge_offsets=None,
additional_keys=[], **kwargs)
@property
def num_nodes(self):
if isinstance(self._nodes.ids, np.ndarray):
return self._nodes.ids.size
else:
return self._nodes.ids.shape.as_list()[0]
@property
def num_edges(self):
if isinstance(self._edge_index, np.ndarray):
return self._edge_index.shape[1]
else:
return self._edge_index.shape.as_list()[1]
@property
def num_graphs(self):
"""number of SubGraphs.
"""
if isinstance(self.graph_node_offsets, np.ndarray):
return np.amax(self.graph_node_offsets) + 1
else:
return tf.reduce_max(self.node_graph_index) + 1
@property
def graph_node_offsets(self):
return self._graph_node_offsets
@property
def graph_edge_offsets(self):
return self._graph_edge_offsets
@property
def node_schema(self):
return self._node_schema
@property
def edge_schema(self):
return self._edge_schema
@property
def additional_keys(self):
return self._additional_keys
HeteroBatchGraph(since v1.1.0)
class HeteroBatchGraph(HeteroSubGraph):
"""A HeteroBatchGraph object, which represents a batch of `HeteroSubGraph`s.
Each type of Nodes, edges in subgraphs are concatenated together and their
offsets are recorded with `graph_node_offsets_dict` and
`graph_edge_offsets_dict`.
Args:
edge_index_dict: concatenated edge_index_dict of `HeteroSubGraph`s
according keys.
nodes_dict: A dict of `Data`/Tensor object denoting concatenated nodes.
node_schema_dict: A dict of {name: Decoder} to describe feature of each
type of node.
graph_node_offsets_dict: A dict, keys indicate the type of nodes,
values indicate the offset of each HeteroSubgraph nodes.
edges_dict: A dict of `Data`/Tensor object denoting concatenated edges.
edge_schema_dict: A dict of {name: Decoder} to describe feature of each
type of edge. The edges_dict is not None when Decoder is not None.
graph_edge_offsets_dict: A dict, keys indicate the type of edges,
values indicate the offset of each HeteroSubgraph edges.
"""
def __init__(self, edge_index_dict, nodes_dict, node_schema_dict,
graph_node_offsets_dict, edges_dict=None, edge_schema_dict=None,
graph_edge_offsets_dict=None, **kwargs):
super(HeteroBatchGraph, self).__init__(edge_index_dict, nodes_dict)
self._edge_index_dict = edge_index_dict
self._nodes_dict = nodes_dict
self._node_schema_dict = node_schema_dict
self._edges_dict = edges_dict
self._edge_schema_dict = edge_schema_dict
self._graph_node_offsets_dict = graph_node_offsets_dict
self._graph_edge_offsets_dict = graph_edge_offsets_dict
for key, item in kwargs.items():
self[key] = item
def num_nodes(self, node_type):
nodes = self._nodes_dict[node_type]
if isinstance(nodes.ids, np.ndarray):
return nodes.ids.size
else:
return nodes.ids.shape.as_list()[0]
def num_edges(self, edge_type):
edge_index = self._edge_index_dict[edge_type]
if isinstance(edge_index, np.ndarray):
return edge_index.shape[1]
else:
return edge_index.shape.as_list()[1]
@property
def num_graphs(self):
"""number of SubGraphs.
"""
graph_node_offsets = next(iter(self.graph_node_offsets_dict))
if isinstance(graph_node_offsets, np.ndarray):
return np.amax(graph_node_offsets) + 1
else:
return tf.reduce_max(graph_node_offsets) + 1
@property
def graph_node_offsets_dict(self):
return self._graph_node_offsets_dict
@property
def graph_edge_offsets_dict(self):
return self._graph_edge_offsets_dict
@property
def node_schema_dict(self):
return self._node_schema_dict
@property
def edge_schema_dict(self):
return self._edge_schema_dict
特征处理
对应nn/tf/data/
特征处理模块
上面我们完成了GSL的结果到tensor的转换,下面我们介绍特征处理模块。一般模型处理时需要输入的数据为一个连续的向量。在实际生产中,节点的特征往往包括int, float,string等连续、离散、多值特征,因此需要将这些特征处理成一个连续的向量。具体来说主要是对离散和多值特征通过tf.nn.embedding_lookup
转换成一个连续的向量,然后和连续特征拼接在一起作为节点的输入特征。
FeatureColumn
FeatureColumn
提供了对不同类型特征的处理。包括连续特征NumericColumn
, 离散特征EmbeddingColumn
, 多值离散特征SparseEmbeddingColumn
。为了加速embedding_lookup过程,我们同时封装了FusedEmbeddingColumn
,将相同dimension的数据合并在一起进行embedding_lookup。
对于embedding variable比较大的情况,我们使用了tf.min_max_variable_partitioner
(默认)进行varibale的切分。
class FeatureColumn(Module):
""" Transforms raw features to dense tensors. For continuous features, just
return the original values, for categorical features, embeds them to dense
vectors.
For example, each 'user' vertex in the graph contains 6 attributes splited by
':', which looks like '28:0:0.2:Hangzhou:1,5,12:1000008'. To handle such a
vertex, 6 `FeatureColumn` objects are needed, each of which will return a
dense value. And then we will concat all the dense values together to get the
representation of this vertex.
Each feature can be configured differently. The first two features, 28 and 0,
are categorical, and both of them will be encoded into continuous spaces with
dimension 12. To improve the efficiency, we can fuse the two spaces together
to minimize the communication frequence when encoding. If the shapes of raw
spaces are [100, 12] and [50, 12], we will get one space with shape [150, 12]
after fusion.
The third feature is 0.2, we just return it as a numeric feature.
The fourth feature is a string, which need to be transformed into an integer
and then encoded with a continuous space.
The fifth feature is a multi-value splited by ','. The count of elements is
not fixed. We need to encode each value into a continuous space and merge
them together.
The last feature is a big integer, and just transform it into a continuous
space.
All of the above features will be handled by different FeatureColumns, and
then concatenated by a FeatureGroup.
"""
def __init__(self):
pass
def forward(self, x):
raise NotImplementedError
class PartitionableColumn(FeatureColumn):
""" `PartitionableColumn` uses `tf.min_max_variable_partitioner` to
partition the embedding varibles. Note that the `conf.emb_max_partitions`
must be provided when using partitioner.
"""
def _partitioner(self):
max_parts = conf.emb_max_partitions
if max_parts is not None:
return tf.min_max_variable_partitioner(
max_partitions=max_parts, min_slice_size=conf.emb_min_slice_size)
else:
return None
class NumericColumn(FeatureColumn):
""" Represents real valued or numerical features.
Args:
name: A unique string identifying the input feature.
normalizer_func: If not `None`, a function that can be used to normalize
the value of the tensor. Normalizer function takes the input `Tensor`
as its argument, and returns the output `Tensor`.
(e.g. lambda x: (x - 1.0) / 2.0).
"""
def __init__(self, name, normalizer_func=None)
class EmbeddingColumn(PartitionableColumn):
""" Uses embedding_lookup to embed the categorical features.
Args:
name: A unique string identifying the input feature.
bucket_size: The size of the embedding variable.
dimension: The dimension of the embedding.
need_hash: Whether need hash the input feature.
"""
def __init__(self, name, bucket_size, dimension, need_hash=False)
class DynamicEmbeddingColumn(PartitionableColumn):
""" EmbeddingColumn with dynamic bucket_size.
"""
def __init__(self, name, dimension, is_string=False):
class FusedEmbeddingColumn(PartitionableColumn):
""" Fuses the input feature with the same dimension setting and then
lookups embeddings.
Args:
name: A unique string identifying the input feature.
bucket_list: A list of the size of the embedding variable.
dimension: The dimension of the embedding.
"""
def __init__(self, name, bucket_list, dimension)
class SparseEmbeddingColumn(PartitionableColumn):
""" Uses sparse_embedding_lookup to embed the multivalent categorical
feature which is split with delimiter.
Args:
name: A unique string identifying the input feature.
bucket_size: The size of the embedding variable.
dimension: The dimension of the embedding.
delimiter: The delimiter of multivalent feature.
"""
def __init__(self, name, bucket_size, dimension, delimiter)
class DynamicSparseEmbeddingColumn(PartitionableColumn):
""" SparseEmbeddingColumn with dynamic bucket_size.
"""
def __init__(self, name, dimension, delimiter)
FeatureHandler
一个FeatureColumn处理一列特征,多个 FeatureColumn
聚集在一起形成 FeatureGroup
,顶点和边的特征根据排列顺序可能划分到多个 FeatureGroup
,划分逻辑由 FeatureHandler
实现。FeatureHandler
接收一个完整的顶点或边的feature_spec
(见“图操作接口->数据源->Decoder定义”),返回对应顶点或边的拼接后的向量化特征。向量的维度需要在构图时通过decoder指定,**FeatureHandler**
返回的结果的dimension和decoder里指定的特征配置对应。
class FeatureHandler(Module):
"""Encodes the input features of `Data` using `FeatureSpec`.
For efficiency, we group the features into `FeatureGroup` accroding to
the `FeatureSpec` and then encode each `FeatureGroup` and merge their
outputs as the final output.
Args:
name: A unique string.
feature_spec: A `FeatureSpec` object to describe the input feature
of `Data`.
fuse_embedding: Whether fuses the input features of the same
specified dimension before feature encoding(embedding lookup).
"""
def __init__(self, name, feature_spec,
fuse_embedding=True)
transform
对EgoGraph
和BatchGraph
,我们提供了transform
函数完成上述特征的处理和变化。transform
里会构建一个FeatureHandler
,进行特征的处理,同时,transform
也支持传一个transform_func
进行一些特征预处理操作。
EgoGraph的transform
def transform(self, transform_func=None):
"""transforms `EgoGraph`. Default transformation is encoding nodes feature
to embedding.
Args:
transform_func: A function that takes in an `EgoGraph` object and returns
a transformed version.
"""
if self.node_schema is None:
return self
assert len(self.node_schema) == (len(self.nbr_nodes) + 1)
s = self.node_schema[0]
vertex_handler = FeatureHandler(s[0], s[1])
vertex_tensor = vertex_handler.forward(self.src)
neighbors = []
for i, nbr in enumerate(self.nbr_nodes):
s = self.node_schema[i + 1]
neighbor_handler = FeatureHandler(s[0], s[1])
neighbor_tensor = neighbor_handler.forward(self.nbr_nodes[i])
neighbors.append(neighbor_tensor)
return EgoGraph(vertex_tensor, neighbors, None, self.nbr_nums)
BatchGraph的transform
def transform(self, transform_func=None):
"""transforms `BatchGraph`. Default transformation is encoding
nodes feature to embedding.
Args:
transform_func: A function that takes in an `BatchGraph` object
and returns a transformed version.
"""
if self.node_schema is None:
return self
vertex_handler = FeatureHandler(self.node_schema[0],
self.node_schema[1].feature_spec)
node = Data(self.nodes.ids,
self.nodes.int_attrs,
self.nodes.float_attrs,
self.nodes.string_attrs)
node_tensor = vertex_handler.forward(node)
graph = BatchGraph(self.edge_index, node_tensor,
self.node_schema, self.graph_node_offsets,
additional_keys=self.additional_keys)
for key in self.additional_keys:
graph[key] = self[key]
return graph
在构图时配置了Decoder
的attr_types
和attr_dims
的情况下,我们在用Dataset
生成EgoGraph
或BatchGraph
时会自动取得Decoder
对应的feature_spec
,在模型前向过程中,可以直接调用transform
得到特征转换后的EgoGraph
或BatchGraph
。