数据层

数据层的一个主要目标是提供GraphLearn图操作返回的numpy数据流到TensorFlow1.x里的tensor数据流的转换。我们使用了tf.data.Datasetfrom_generator接口来完成这样转换过程。此外,为了方便对节点、边特征的统一处理,我们提供了特征处理接口来完成对原始连续特征、离散特征、多值离散特征的处理,将这些特征处理成连续的向量,作为后续模型的输入。下面我们按照数据处理的先后顺序描述整个数据层的构建过程: 基础数据对象,描述模型层的基本数据对象。Tensor数据对象,描述TensorFlow tensor格式的数据格式。 特征处理,描述对不同类型原始特征的处理的接口。 ​

基础数据对象

对应nn/data.py, nn/subgraph.py, nn/dataset.py

GraphLearn每个图操作的返回结果为numpy ndarray格式的Nodes或者Edges对象,为了方便模型层的处理,我们使用Data对象来统一表示NodesEdges。这样一个GSL的查询结果,可以用一个Data dict来表示,key是alias, value是具体的Data对象。

Data

class Data(object):
  """A plain object modeling a batch of `Nodes` or `Edges`."""
  def __init__(self,
               ids=None,
               ints=None,
               floats=None,
               strings=None,
               labels=None,
               weights=None,
               **kwargs):
    """ ints, floats and strings are attributes in numpy or Tensor format 
      with the shape
      [batch_size, int_attr_num],
      [batch_size, float_attr_num],
      [batch_size, string_attr_num].
      labels and weights are in numpy or Tensor format with the shape 
      [batch_size], [batch_size].
      The data object can be extented by any other additional data.
    """
    self.ids = ids
    self.int_attrs = ints
    self.float_attrs = floats
    self.string_attrs = strings
    self.labels = labels
    self.weights = weights
    for key, item in kwargs.items():
      self[key] = item

    self._handler_dict = {}

  def apply(self, func):
    """Applies the function `func` to all attributes.
    """
    for k, v in self.__dict__.items():
      if v is not None and k[:2] != '__' and k[-2:] != '__':
        self.__dict__[k] = func(v)
    return self

SubGraph

为了方便GNNs算法的建模,我们使用SubGraph来表示一个采样的子图,它由edge_index, nodesedges构成。由于目前GSL没有提供直接采样子图的功能,因此生成SubGraph时需要显示地使用一个inducer

class SubGraph(object):
  """ `SubGraph` is a basic data structure used to describe a sampled 
  subgraph. It constists of `edge_index` and nodes `Data` and edges `Data`.

  Args:
    edge_index: A np.ndarray object with shape [2, batch_size], which indicates
      [rows, cols] of SubGraph.
    nodes: A `Data` object denoting the input nodes.
    edges: A `Data` object denoting the input edges.
    
  Note that this object can be extented by any other additional data.
  """
  def __init__(self, edge_index, nodes, edges=None, **kwargs):
    self._edge_index = edge_index
    self._nodes = nodes
    self._edges = edges
    for key, item in kwargs.items():
      self[key] = item

  @property
  def num_nodes(self):
    return self._nodes.ids.size

  @property
  def num_edges(self):
    return self._edge_index.shape[1]

  @property
  def nodes(self):
    return self._nodes

  @property
  def edge_index(self):
    return self._edge_index

  @property
  def edges(self):
    return self._edges

  def __getitem__(self, key):
    return getattr(self, key, None)

  def __setitem__(self, key, value):
    setattr(self, key, value)

HeteroSubGraph(since v1.1.0)

class HeteroSubGraph(object):
  """ A data object describing heterogeneous `SubGraph`.
  Different types of nodes and edges are represented by a dict.

  Args:
    edge_index_dict: A dict of np.ndarray objects. Each key is a tuple of 
      (src_type, edge_type, dst_type) and each value indicates [rows, cols].
    nodes_dict: A dict of `Data`/ndarray object denoting different types of nodes.
    edges_dict: A dict of `Data`/ndarray object denoting different types of edges.
  
  Examples:
    For meta-path "user-click-item", the HeteroSubGraph may be created as follows:
      edge_index_dict[('user', 'click', 'item')] = np.array([[0,1,2], [2,3,4]])
      edges_dict[('user', 'click', 'item')] = Data(...)
      nodes_dict['user'] = Data(...)
      nodes_dict['item'] = Data(...)
      hg = HeteroSubGraph(edge_index_dict, nodes_dict, edges_dict)

  """
  def __init__(self, edge_index_dict, nodes_dict, edges_dict=None, **kwargs):
    self._edge_index_dict = edge_index_dict
    self._nodes_dict = nodes_dict
    self._edges_dict = edges_dict
    for key, item in kwargs.items():
      self[key] = item

  def num_nodes(self, node_type):
    if isinstance(self._nodes_dict[node_type], Data):
      return self._nodes_dict[node_type].ids.size
    else:
      return self._nodes_dict[node_type].size

  def num_edges(self, edge_type):
    return self._edge_index_dict[edge_type].shape[1]

  @property
  def nodes_dict(self):
    return self._nodes_dict

  @property
  def edge_index_dict(self):
    return self._edge_index_dict

  @property
  def edges_dict(self):
    return self._edges_dict

  @property
  def keys(self):
    r"""Returns all names of graph attributes."""
    keys = [key for key in self.__dict__.keys() if self[key] is not None]
    keys = [key for key in keys if key[:2] != '__' and key[-2:] != '__']
    return keys

  @property
  def node_types(self):
    """Returns all node types of the heterogeneous subgraph."""
    return list(self._nodes_dict.keys())

  @property
  def edge_types(self):
    """Returns all edge types of the heterogeneous subgraph."""
    return list(self._edge_index_dict.keys())

  def __getitem__(self, key):
    return getattr(self, key, None)

  def __setitem__(self, key, value):
    setattr(self, key, value)

Tensor数据对象

对应nn/tf/data/

我们将一个GSL的结果首先组织成Data dict或者使用induce_func组成成SubGraphs, 这时候数据还是numpy格式。接着我们使用tf.data.Datasetfrom_generator函数来完成numpy到tensor的转换。我们内置了一个Dataset 对象完成这样的转换。

Dataset

Dataset使用from_generator方法,完成numpy到tensor格式的转换,对外提供一个initializable的iterator,以及get_data_dict, get_egograph, get_batchgraph的接口。

在使用SubGraph/HeterSubGraph格式时需要提供一个inducer,完成GSL采样结果到SubGraph/HeterSubGraph的转换,具体参考examples/tf/sage, examples/tf/seal, examples/tf/bipartite_sage里的inducer例子。

class Dataset(object):
  """`Dataset` object is used to convert GSL query results to Tensor format
  `Data`. It provides methods to get raw `Data` dict, `EgoGraph`, `SubGraph` 
  and `HeteroSubGraph`.

  Args:
    query: GSL query.
    window: dataset capacity.
    inducer: A `SubGraphInducer` instance to generate SubGraph/HeteroSubGraph.
  """
  def __init__(self, query, window=10, inducer=None, **kwargs):

  @property
  def iterator(self):
    return self._iterator
  
  def get_data_dict(self):
    """get a dict of tensor format `Data` corresponding the given query.
    Keys of the dict is the aliaes in query.
    """
    return self._rds.build_data_dict(list(self._values))

  def get_egograph(self, source, neighbors=None):
    """ Origanizes the data dict as EgoGraphs and then check and return
    the specified `EgoGraph`.
    Args:
      source(str): alias of centric vertices.
      neighbors(list of str): alias of neighbors at each hop.
        Default `None`: automatically generating the positive neighbors for
        centric vertices. It requires that each hop only has one postive
        downstream in GSL.
        Given list of string: the alias of each hop in GSL. The list must
        follow the order of traverse in GSL, and each one should be the postive
        or negative downstream for the front.
    """

  def get_batchgraph(self):
    """get `BatchGraph`/`HeteroBatchGraph`s. 
    """
	...
    return pos_graph, neg_graph
  • get_data_dict: 将任意一个GSL的结果,转换成一个Data dict的形式。dict的key是GSL里的alias, value是对应的一个GSL节点的结果的tensor格式的Data

  • get_egograph:将包含fix-sized的采样的GSL的结果,转换成EgoGraph的格式。 可以通过alias来获取某个ego的EgoGraph子图

  • get_batchgraph: 将包含full neighbor sampler的GSL的结果,转换成BatchGraph的格式,目前该方法还是experimental的,只支持从边遍历的一跳full neighbor采样,而且对GSL里的alias也有严格要求,必须是SubKeys里的一种。

下面我们具体介绍这三种格式的数据。 ​

Data dict

任意的GSL都可以转换成Data dict的形式,dict的key是GSL里的alias, value是对应的一个GSL节点的结果的tensor格式的Data。 ​

EgoGraph

EgoGraph表示一个由中心节点和其k-hop邻居构成的子图。提供了src(), hop_node(), hop_edge()等接口来获取中心节点,某跳的邻居点和某跳的邻居边。

class EgoGraph(object):
  """ `EgoGraph` is a basic data structure used to describe a sampled graph. 
  It constists of src `Data` and src's neighbors(nodes and edges) `Data`.
  The `EgoGraph` is mainly used to represent subgraphs generated by fixed-size 
  neighbor sampling, in which the data can be efficiently organized in dense 
  format and the model can be computed using the dense operators.

  Args:
    src: A `Data`/Tensor object used to describe the centric nodes.
    nbr_nodes: A list of `Data`/Tensor instance to describe neighborhood nodes.
    node_schema: A list of tuple to describe the FeatureSpec of src and
      neighbor nodes. Each tuple is formatted with (name, spec), in which `name`
      is node's type, and `spec` is a FeatureSpec object. Be sure that
      `len(node_schema) == len(neighbors) + 1`.
    nbr_nums: A list of number of neighbor nodes per hop.
    nbr_edges: A list of `Data`/Tensor instance to describe neighborhood edges.
    edge_schema: A list of tuple to describe the `FeatureSpec` of neighbor edges.
  """
  def __init__(self,
               src,
               nbr_nodes,
               node_schema,
               nbr_nums,
               nbr_edges=[],
               edge_schema=[],
               **kwargs):
    self._src = src
    self._nbr_nodes = nbr_nodes
    self._node_schema = node_schema
    self._nbr_edges = nbr_edges
    self._edge_schema = edge_schema
    if self._node_schema is not None:
      assert len(self._node_schema) == len(nbr_nums) + 1
    self._nbr_nums = np.array(nbr_nums)

  @property
  def src(self):
    return self._src

  @property
  def node_schema(self):
    return self._node_schema

  @property
  def nbr_nodes(self):
    return self._nbr_nodes

  @property
  def nbr_nums(self):
    return self._nbr_nums

  @property
  def edge_schema(self):
    return self._edge_schema

  @property
  def nbr_edges(self):
    return self._nbr_edges

  def hop_node(self, i):
    """ Get the hop ith neighbors nodes of centric src, where i starts 
    from zero. The return value is a tensor with shape 
    [batch_size * k_1 *...* k_i, dim], where k_i is the expand neighbor 
    count at hop i and dim is the sum of all feature dimensions, which 
    may be different due to kinds of vertex types.
    """
    return self._nbr_nodes[i]
  
  def hop_edge(self, i):
    if len(self._nbr_edges) == 0:
      raise ValueError("No edge data.")
    return self._nbr_edges[i]

BatchGraph

为了高效批量训练,我们将一个batch的SubGraphs合并成BatchGraph的格式,BatchGraph继承自SubGraph, 提供num_graphs获取SubGraphs个数的接口,同时提供了graph_node_offsetsgraph_edge_offsets 接口来表示合并后的每个SubGraph里的点和边的offset。

class BatchGraph(SubGraph):
  """A BatchGraph object, which represents a batch of `SubGraph`s.
  Nodes, edges in subgraphs are concatenated together and their offsets 
  are recorded with `graph_node_offsets` and `graph_edge_offsets`. The
  `edge_index` of subgraph is remapped according to the order offset of 
  each subgrpah and then form as a new `edge_index`.

  Args:
    edge_index: concatenated edge_index of `SubGraph`s.
    nodes: A `Data`/Tensor object denoting concatenated nodes of `SubGraph`s 
      with shape [batch_size, attr_num].
    node_schema: A (name, Decoder) tuple used to describe 
      the nodes' feature or a list of such tuple to describe src and dst nodes'
      feature for heterogeneous graph.
    graph_node_offsets: indicates the nodes offset of each `SubGraph`.
    edges: A `Data`/Tensor object denoting concatenated edges of `SubGraph`s.
    node_schema: A (name, Decoder) tuple used to describe the edges' feature.
    graph_edge_offsets: indicates the edges offset of each `SuGraph`.
    additional_keys: A list of keys used to indicate the additional data. Note 
      that these keys must not contain the above args. 
      Note that we require this argument in order to keep the correct order of 
      the additional data when generating Tensor format of `BatchGraph`.
  """
  def __init__(self, edge_index, nodes, node_schema, graph_node_offsets,
               edges=None, edge_schema=None, graph_edge_offsets=None, 
               additional_keys=[], **kwargs)
   
  @property
  def num_nodes(self):
    if isinstance(self._nodes.ids, np.ndarray):
      return self._nodes.ids.size
    else:
      return self._nodes.ids.shape.as_list()[0]

  @property
  def num_edges(self):
    if isinstance(self._edge_index, np.ndarray):
      return self._edge_index.shape[1]
    else:
      return self._edge_index.shape.as_list()[1]

  @property
  def num_graphs(self):
    """number of SubGraphs.
    """
    if isinstance(self.graph_node_offsets, np.ndarray):
      return np.amax(self.graph_node_offsets) + 1
    else:
      return tf.reduce_max(self.node_graph_index) + 1

  @property
  def graph_node_offsets(self):
    return self._graph_node_offsets

  @property
  def graph_edge_offsets(self):
    return self._graph_edge_offsets

  @property
  def node_schema(self):
    return self._node_schema

  @property
  def edge_schema(self):
    return self._edge_schema

  @property
  def additional_keys(self):
    return self._additional_keys

HeteroBatchGraph(since v1.1.0)

class HeteroBatchGraph(HeteroSubGraph):
  """A HeteroBatchGraph object, which represents a batch of `HeteroSubGraph`s.
  Each type of Nodes, edges in subgraphs are concatenated together and their 
  offsets are recorded with `graph_node_offsets_dict` and 
  `graph_edge_offsets_dict`.

  Args:
    edge_index_dict: concatenated edge_index_dict of `HeteroSubGraph`s 
      according keys.
    nodes_dict: A dict of `Data`/Tensor object denoting concatenated nodes.
    node_schema_dict: A dict of {name: Decoder} to describe feature of each
      type of node.
    graph_node_offsets_dict: A dict, keys indicate the type of nodes, 
      values indicate the offset of each HeteroSubgraph nodes.
    edges_dict: A dict of `Data`/Tensor object denoting concatenated edges.
    edge_schema_dict: A dict of {name: Decoder} to describe feature of each
      type of edge. The edges_dict is not None when Decoder is not None.
    graph_edge_offsets_dict: A dict, keys indicate the type of edges, 
      values indicate the offset of each HeteroSubgraph edges.
  """
  def __init__(self, edge_index_dict, nodes_dict, node_schema_dict, 
               graph_node_offsets_dict, edges_dict=None, edge_schema_dict=None, 
               graph_edge_offsets_dict=None, **kwargs):
    super(HeteroBatchGraph, self).__init__(edge_index_dict, nodes_dict)
    self._edge_index_dict = edge_index_dict
    self._nodes_dict = nodes_dict
    self._node_schema_dict = node_schema_dict
    self._edges_dict = edges_dict
    self._edge_schema_dict = edge_schema_dict
    self._graph_node_offsets_dict = graph_node_offsets_dict
    self._graph_edge_offsets_dict = graph_edge_offsets_dict
    for key, item in kwargs.items():
      self[key] = item

  def num_nodes(self, node_type):
    nodes = self._nodes_dict[node_type]
    if isinstance(nodes.ids, np.ndarray):
      return nodes.ids.size
    else:
      return nodes.ids.shape.as_list()[0]

  def num_edges(self, edge_type):
    edge_index = self._edge_index_dict[edge_type]
    if isinstance(edge_index, np.ndarray):
      return edge_index.shape[1]
    else:
      return edge_index.shape.as_list()[1]

  @property
  def num_graphs(self):
    """number of SubGraphs.
    """
    graph_node_offsets = next(iter(self.graph_node_offsets_dict))
    if isinstance(graph_node_offsets, np.ndarray):
      return np.amax(graph_node_offsets) + 1
    else:
      return tf.reduce_max(graph_node_offsets) + 1

  @property
  def graph_node_offsets_dict(self):
    return self._graph_node_offsets_dict

  @property
  def graph_edge_offsets_dict(self):
    return self._graph_edge_offsets_dict

  @property
  def node_schema_dict(self):
    return self._node_schema_dict

  @property
  def edge_schema_dict(self):
    return self._edge_schema_dict

特征处理

对应nn/tf/data/ ​

特征处理模块

上面我们完成了GSL的结果到tensor的转换,下面我们介绍特征处理模块。一般模型处理时需要输入的数据为一个连续的向量。在实际生产中,节点的特征往往包括int, float,string等连续、离散、多值特征,因此需要将这些特征处理成一个连续的向量。具体来说主要是对离散和多值特征通过tf.nn.embedding_lookup转换成一个连续的向量,然后和连续特征拼接在一起作为节点的输入特征。 ​

FeatureColumn

FeatureColumn提供了对不同类型特征的处理。包括连续特征NumericColumn, 离散特征EmbeddingColumn, 多值离散特征SparseEmbeddingColumn。为了加速embedding_lookup过程,我们同时封装了FusedEmbeddingColumn,将相同dimension的数据合并在一起进行embedding_lookup。 对于embedding variable比较大的情况,我们使用了tf.min_max_variable_partitioner(默认)进行varibale的切分。

class FeatureColumn(Module):
  """ Transforms raw features to dense tensors. For continuous features, just 
  return the original values, for categorical features, embeds them to dense
  vectors.

  For example, each 'user' vertex in the graph contains 6 attributes splited by
  ':', which looks like '28:0:0.2:Hangzhou:1,5,12:1000008'. To handle such a
  vertex, 6 `FeatureColumn` objects are needed, each of which will return a
  dense value. And then we will concat all the dense values together to get the
  representation of this vertex.

  Each feature can be configured differently. The first two features, 28 and 0,
  are categorical, and both of them will be encoded into continuous spaces with
  dimension 12. To improve the efficiency, we can fuse the two spaces together
  to minimize the communication frequence when encoding. If the shapes of raw
  spaces are [100, 12] and [50, 12], we will get one space with shape [150, 12]
  after fusion.

  The third feature is 0.2, we just return it as a numeric feature.

  The fourth feature is a string, which need to be transformed into an integer
  and then encoded with a continuous space.

  The fifth feature is a multi-value splited by ','. The count of elements is
  not fixed. We need to encode each value into a continuous space and merge
  them together.

  The last feature is a big integer, and just transform it into a continuous
  space.

  All of the above features will be handled by different FeatureColumns, and
  then concatenated by a FeatureGroup.
  """

  def __init__(self):
    pass

  def forward(self, x):
    raise NotImplementedError


class PartitionableColumn(FeatureColumn):
  """ `PartitionableColumn` uses `tf.min_max_variable_partitioner` to 
  partition the embedding varibles. Note that the `conf.emb_max_partitions` 
  must be provided when using partitioner.
  """
  def _partitioner(self):
    max_parts = conf.emb_max_partitions
    if max_parts is not None:
      return tf.min_max_variable_partitioner(
          max_partitions=max_parts, min_slice_size=conf.emb_min_slice_size)
    else:
      return None


class NumericColumn(FeatureColumn):
  """ Represents real valued or numerical features.
  Args:
    name: A unique string identifying the input feature.
    normalizer_func: If not `None`, a function that can be used to normalize 
      the value of the tensor. Normalizer function takes the input `Tensor` 
      as its  argument, and returns the output `Tensor`. 
      (e.g. lambda x: (x - 1.0) / 2.0). 
  """
  def __init__(self, name, normalizer_func=None)


class EmbeddingColumn(PartitionableColumn):
  """ Uses embedding_lookup to embed the categorical features.
  Args:
    name: A unique string identifying the input feature.
    bucket_size: The size of the embedding variable.
    dimension: The dimension of the embedding.
    need_hash: Whether need hash the input feature.
  """
  def __init__(self, name, bucket_size, dimension, need_hash=False)
    

class DynamicEmbeddingColumn(PartitionableColumn):
  """ EmbeddingColumn with dynamic bucket_size.
  """
  def __init__(self, name, dimension, is_string=False):


class FusedEmbeddingColumn(PartitionableColumn):
  """ Fuses the input feature with the same dimension setting and then
  lookups embeddings.
  Args:
    name: A unique string identifying the input feature.
    bucket_list: A list of the size of the embedding variable.
    dimension: The dimension of the embedding.
  """
  def __init__(self, name, bucket_list, dimension)


class SparseEmbeddingColumn(PartitionableColumn):
  """ Uses sparse_embedding_lookup to embed the multivalent categorical 
  feature which is split with delimiter.
  Args:
    name: A unique string identifying the input feature.
    bucket_size: The size of the embedding variable.
    dimension: The dimension of the embedding.
    delimiter: The delimiter of multivalent feature.
  """
  def __init__(self, name, bucket_size, dimension, delimiter)


class DynamicSparseEmbeddingColumn(PartitionableColumn):
  """ SparseEmbeddingColumn with dynamic bucket_size.
  """
  def __init__(self, name, dimension, delimiter)

FeatureHandler

一个FeatureColumn处理一列特征,多个 FeatureColumn 聚集在一起形成 FeatureGroup ,顶点和边的特征根据排列顺序可能划分到多个 FeatureGroup ,划分逻辑由 FeatureHandler 实现。FeatureHandler 接收一个完整的顶点或边的feature_spec (见“图操作接口->数据源->Decoder定义”),返回对应顶点或边的拼接后的向量化特征。向量的维度需要在构图时通过decoder指定,**FeatureHandler**返回的结果的dimension和decoder里指定的特征配置对应。

class FeatureHandler(Module):
  """Encodes the input features of `Data` using `FeatureSpec`.
  For efficiency, we group the features into `FeatureGroup` accroding to 
  the `FeatureSpec` and then encode each `FeatureGroup` and merge their 
  outputs as the final output.

  Args:
    name: A unique string.
    feature_spec: A `FeatureSpec` object to describe the input feature 
      of `Data`.
    fuse_embedding: Whether fuses the input features of the same 
      specified dimension before feature encoding(embedding lookup).
  """
  def __init__(self, name, feature_spec,
               fuse_embedding=True)

transform

EgoGraphBatchGraph,我们提供了transform函数完成上述特征的处理和变化。transform里会构建一个FeatureHandler,进行特征的处理,同时,transform也支持传一个transform_func进行一些特征预处理操作。

EgoGraph的transform

  def transform(self, transform_func=None):
    """transforms `EgoGraph`. Default transformation is encoding nodes feature 
    to embedding.
    Args:
      transform_func: A function that takes in an `EgoGraph` object and returns 
        a transformed version. 
    """
    if self.node_schema is None:
      return self

    assert len(self.node_schema) == (len(self.nbr_nodes) + 1)

    s = self.node_schema[0]
    vertex_handler = FeatureHandler(s[0], s[1])
    vertex_tensor = vertex_handler.forward(self.src)

    neighbors = []
    for i, nbr in enumerate(self.nbr_nodes):
      s = self.node_schema[i + 1]
      neighbor_handler = FeatureHandler(s[0], s[1])
      neighbor_tensor = neighbor_handler.forward(self.nbr_nodes[i])
      neighbors.append(neighbor_tensor)

    return EgoGraph(vertex_tensor, neighbors, None, self.nbr_nums)

BatchGraph的transform

  def transform(self, transform_func=None):
    """transforms `BatchGraph`. Default transformation is encoding 
    nodes feature to embedding.
    Args:
      transform_func: A function that takes in an `BatchGraph` object 
        and returns a transformed version. 
    """
    if self.node_schema is None:
      return self
    vertex_handler = FeatureHandler(self.node_schema[0],
                                    self.node_schema[1].feature_spec)
    node = Data(self.nodes.ids, 
                self.nodes.int_attrs, 
                self.nodes.float_attrs, 
                self.nodes.string_attrs)
    node_tensor = vertex_handler.forward(node)
    graph = BatchGraph(self.edge_index, node_tensor, 
                       self.node_schema, self.graph_node_offsets,
                       additional_keys=self.additional_keys)
    for key in self.additional_keys:
      graph[key] = self[key]
    return graph

在构图时配置了Decoderattr_typesattr_dims的情况下,我们在用Dataset生成EgoGraphBatchGraph时会自动取得Decoder对应的feature_spec,在模型前向过程中,可以直接调用transform得到特征转换后的EgoGraphBatchGraph