快速开始

本文档包含三个部分

  • 如何基于GL快速跑通一个GNN模型

  • 如何把数据加载到GL中,以及如何使用图数据、图采样、负采样等接口

  • GraphSAGE为例,说明如何基于GL和TensorFlow开发一个自己的GNN模型

跑通内置模型

GL内置了一些常见模型,如GCNGraphSAGE,以及数据集cora、ppi等。 我们从跑通cora数据的顶点分类任务开始接触GL。完整模型代码请参考模型示例

# 准备数据
cd graph-learn/examples/data/
python cora.py

# 训练、模型评估
cd ../tf/ego_sage/
python train_supervised.py

GL接口使用

GL为GNN的开发提供了大量基础接口,我们提供了图接口使用示例以展示如何基于GL来构图、查询、采样、负采样。

在开始前,我们需要准备一份图数据源,这里准备了一个生成数据的脚本gen_test_data.py,用于生成顶点和边的本地数据。

准备测试脚本test_dist_server_mode_fs_tracker.py如下:

import getopt
import os
import sys

import graphlearn as gl
from query_examples import *

def main(argv):
  cur_path = sys.path[0]

  server_count = -1
  client_count = -1
  tracker = ""
  job_name = ""
  task_index = -1

  opts, args = getopt.getopt(argv,
                             's:c:t:j:ti:',
                             ['server_count=', 'client_count=', 'tracker=',
                              'job_name=', 'task_index='])
  for opt, arg in opts:
    if opt in ('-s', '--server_count'):
      server_count = int(arg)
    elif opt in ('-c', '--client_count'):
      client_count = int(arg)
    elif opt in ('-t', '--tracker'):
      tracker = arg
    elif opt in ('-j', '--job_name'):
      job_name = arg
    elif opt in ('-ti', '--task_index'):
      task_index = int(arg)
    else:
      pass

  g = gl.Graph()

  g.node(os.path.join(cur_path, "data/user"),
         node_type="user", decoder=gl.Decoder(weighted=True)) \
    .node(os.path.join(cur_path, "data/item"),
          node_type="item", decoder=gl.Decoder(attr_types=['string', 'int', 'float', 'float', 'string'])) \
    .edge(os.path.join(cur_path, "data/u-i"),
          edge_type=("user", "item", "buy"), decoder=gl.Decoder(weighted=True), directed=False) 

  cluster={"server_count": server_count, "client_count": client_count, "tracker":tracker}
  g.init(cluster=cluster, job_name=job_name, task_index=task_index)

  if job_name == "server":
    print("Server {} started.".format(task_index))
    g.wait_for_close()

  if job_name == "client":
    print("Client {} started.".format(task_index))

    query = g.V("user").batch(32).shuffle(traverse=True).alias("src") \
          .outV("buy").sample(5).by("edge_weight").alias("hop1") \
          .inE("buy").sample(2).by("random").alias("hop1-hop2") \
          .inV().alias("hop2") \
          .values()

    ds = gl.Dataset(query)
    epoch = 2
    for i in range(epoch):
      step = 0
      while True:
        try:
          res = ds.next()
          src_nodes = res["src"]
          print(src_nodes.ids)
        except gl.OutOfRangeError:
          break

    g.close()
    print("Client {} stopped.".format(task_index))

if __name__ == "__main__":
  main(sys.argv[1:])

query_examples.py脚本中展示了更多的图接口的使用示例以供参考。

准备完数据和代码后,我们在本地拉起5个进程,2个server,3个worker,分布式执行。

#!/usr/bin/env bash
HERE=$(cd "$(dirname "$0")";pwd)

rm -rf $HERE/tracker
mkdir -p $HERE/tracker

# Only generating data when ./data folder is not existed.
# If `gen_test_data.py` is modified, then delete the data folder first.
if [ ! -d "$HERE/data" ]; then
  mkdir -p $HERE/data
  python $HERE/gen_test_data.py
fi

# Start a graphlearn cluster with 2 servers(processes) and 3 clients(processes).
python $HERE/test_dist_server_mode_fs_tracker.py \
  --server_count=2 --client_count=3 --tracker=$HERE/tracker --job_name="server" --task_index=0 &
sleep 1
python $HERE/test_dist_server_mode_fs_tracker.py \
  --server_count=2 --client_count=3 --tracker=$HERE/tracker --job_name="server" --task_index=1 &
sleep 1
python $HERE/test_dist_server_mode_fs_tracker.py \
  --server_count=2 --client_count=3 --tracker=$HERE/tracker --job_name="client" --task_index=0 &
sleep 1
python $HERE/test_dist_server_mode_fs_tracker.py \
  --server_count=2 --client_count=3 --tracker=$HERE/tracker --job_name="client" --task_index=1 &
sleep 1
python $HERE/test_dist_server_mode_fs_tracker.py \
  --server_count=2 --client_count=3 --tracker=$HERE/tracker --job_name="client" --task_index=2

开发一个GNN模型

下面将基于GLTensorFlow开发一个有监督的GraphSAGE模型,并在Cora数据上训练。

数据准备

我们使用开源数据集Cora,它包含了机器学习的一些论文,以及论文之间的引用关系,每篇论文包含1433个属性。这些论文可以划分为7种类别:Case_Based,Genetic_Algorithms,Neural_Networks,Probabilistic_Methods,Reinforcement_Learning,Rule_Learning,Theory。该GNN任务的目的是预测论文的分类。我们将开源的Cora数据进行处理,得到我们构图所需的数据格式。Cora数据下载和处理的脚本参考cora.py

cd graph-learn/examples/data
python cora.py

产出边数据和顶点数据。其中,边数据即论文之间的引用关系,一篇论文由其他至少一篇论文引用; 顶点数据,即论文的词汇表示,包括论文的属性和标签,属性总共1433个维度,论文类别有7类,因此label值域设置为0~6。

src_id:int64   dst_id:int64
35  1033
35  103482
35  103515
id:int64  label:int32   feature:string
31336      4    0.0:0.0:...
1061127    1    0.0:0.05882353:...
1106406    2    0.0:0.0:...

顶点数据除了id以外,包含label和attributes,其中attributes为1433个float。边数据除了两个端点id以外,还包含边的权重。 数据格式通过gl.Decoder类描述。

import graphlearn as gl

# 描述顶点表的数据格式,包含lable和attributes
node_decoder = gl.Decoder(labeled=True, attr_types=["float"] * args.features_num)

# 表示边表的数据格式,除了端点id以外,还有边的权重
edge_decoder = gl.Decoder(weighted=True)

图构建

图构建的过程是将顶点数据和边数据加载到内存中,转换为逻辑上的图格式。构建完成后,可供查询和采样。

import graphlearn as gl

def load_graph(args):
  dataset_folder = args.dataset_folder
  node_type = args.node_type
  edge_type = args.edge_type
  g = gl.Graph()                                                           \
        .node(dataset_folder + "node_table", node_type=node_type,
              decoder=node_decoder)                      \
        .edge(dataset_folder + "edge_table",
              edge_type=(node_type, node_type, edge_type),
              decoder=edge_decoder, directed=False)           \
        .node(dataset_folder + "train_table", node_type=node_type,
              decoder=gl.Decoder(weighted=True), mask=gl.Mask.TRAIN)       \
        .node(dataset_folder + "val_table", node_type=node_type,
              decoder=gl.Decoder(weighted=True), mask=gl.Mask.VAL)         \
        .node(dataset_folder + "test_table", node_type=node_type,
              decoder=gl.Decoder(weighted=True), mask=gl.Mask.TEST)
  return g


# 调用.init()进行初始化。这里以单机运行为例,分布式详见[图对象-初始化数据](graph_object.md)。
g.init()

图采样

为了实现GraphSAGE,需要进行图采样以作为上层网络的输入。在这里,我们的采样顺序为:
(1) 按batch采样种子“item”顶点;
(2) 采样上述顶点沿着“relation”边的1-hop邻居和2-hop邻居;
(3) 获取路径的上所有顶点的属性和种子顶点的labels。

这里我们定义了一个图采样query,通过遍历图,得到每一次迭代的batch的样本数据。

def query(graph, args):
  prefix = 'train'
  assert len(args.nbrs_num) == args.hops_num
  bs = args.train_batch_size
  q = graph.V(args.node_type, mask=gl.Mask.TRAIN).batch(bs).alias(prefix)
  for idx, hop in enumerate(args.nbrs_num):
    alias = prefix + '_hop' + str(idx)
    q = q.outV(args.edge_type).sample(hop).by('random').alias(alias)
  return q.values()

模型代码

  • 定义loss和accuracy计算函数,并定义train函数,将图上query产生的样本输入给模型。

def supervised_loss(logits, labels):
  loss = tf.nn.sparse_softmax_cross_entropy_with_logits(
      labels=labels, logits=logits)
  return tf.reduce_mean(loss)

def accuracy(logits, labels):
  indices = tf.math.argmax(logits, 1, output_type=tf.int32)
  correct = tf.reduce_sum(tf.cast(tf.math.equal(indices, labels), tf.float32))
  return correct / tf.cast(tf.shape(labels)[0], tf.float32)

def train(graph, model, args):
  tfg.conf.training = True
  query_train = query(graph, args)
  dataset = tfg.Dataset(query_train, window=5)
  eg_train = dataset.get_egograph('train')
  train_embeddings = model.forward(eg_train)
  loss = supervised_loss(train_embeddings, eg_train.src.labels)
  return dataset.iterator, loss
  • 定义GNN模型

# ego_sage.py
import tensorflow as tf
import graphlearn.python.nn.tf as tfg


class EgoGraphSAGE(tfg.EgoGNN):
  def __init__(self,
               dims,
               agg_type="mean",
               bn_func=None,
               act_func=tf.nn.relu,
               dropout=0.0,
               **kwargs):
    assert len(dims) > 1

    layers = []
    for i in range(len(dims) - 1):
      conv = tfg.EgoSAGEConv("homo_" + str(i),
                             in_dim=dims[i],
                             out_dim=dims[i + 1],
                             agg_type=agg_type)
      # If the len(dims) = K, it means that (K-1) LEVEL layers will be added. At
      # each LEVEL, computation will be performed for each two adjacent hops,
      # such as (nodes, hop1), (hop1, hop2) ... . We have (K-1-i) such pairs at
      # LEVEL i. In a homogeneous graph, they will share model parameters.
      layer = tfg.EgoLayer([conv] * (len(dims) - 1 - i))
      layers.append(layer)

    super(EgoGraphSAGE, self).__init__(layers, bn_func, act_func, dropout)
  • 开始训练

def run(args):
  gl.set_tape_capacity(1)
  g = load_graph(args)
  g.init()
  # Define Model
  dims = [args.features_num] + [args.hidden_dim] * (args.hops_num - 1) \
        + [args.class_num]
  model = EgoGraphSAGE(dims,
                       agg_type=args.agg_type,
                       act_func=tf.nn.relu,
                       dropout=args.in_drop_rate)
  # train and test
  train_iterator, loss = train(g, model, args)
  optimizer=tf.train.AdamOptimizer(learning_rate=args.learning_rate)
  train_op = optimizer.minimize(loss)
  train_ops = [loss, train_op]
  test_iterator, test_acc = test(g, model, args)
  with tf.Session() as sess:
    sess.run(tf.local_variables_initializer())
    sess.run(tf.global_variables_initializer())
    sess.run(train_iterator.initializer)
    step = 0
    print("Start Training...")
    for i in range(args.epoch):
      try:
        while True:
          ret = sess.run(train_ops)
          print("Epoch {}, Iter {}, Loss {:.5f}".format(i, step, ret[0]))
          step += 1
      except tf.errors.OutOfRangeError:
        sess.run(train_iterator.initializer) # reinitialize dataset.
  g.close()


if __name__ == "__main__":
  cur_path = sys.path[0]
  argparser = argparse.ArgumentParser("Train EgoSAGE Supervised.")
  argparser.add_argument('--dataset_folder', type=str,
                         default=os.path.join(cur_path, '../../data/cora/'),
                         help="Dataset Folder, list files are node_table, edge_table, "
                              "train_table, val_table and test_table")
  argparser.add_argument('--class_num', type=int, default=7)
  argparser.add_argument('--features_num', type=int, default=1433)
  argparser.add_argument('--train_batch_size', type=int, default=140)
  argparser.add_argument('--hidden_dim', type=int, default=128)
  argparser.add_argument('--in_drop_rate', type=float, default=0.5)
  argparser.add_argument('--hops_num', type=int, default=2)
  argparser.add_argument('--nbrs_num', type=list, default=[25, 10])
  argparser.add_argument('--agg_type', type=str, default="gcn")
  argparser.add_argument('--learning_algo', type=str, default="adam")
  argparser.add_argument('--learning_rate', type=float, default=0.05)
  argparser.add_argument('--weight_decay', type=float, default=0.0005)
  argparser.add_argument('--epoch', type=int, default=40)
  argparser.add_argument('--node_type', type=str, default='item')
  argparser.add_argument('--edge_type', type=str, default='relation')
  args = argparser.parse_args()

  run(args)