Graph Sampling Language(GSL)

简介

GNN发展至今,有一套相对成熟的编程范式。我们将一个GNN模型的开发分为两个阶段:图模式的数据处理部分和神经网络部分。 神经网络部分,我们可以借助成熟的DL框架,如TensorFlow、PyTorch。 如何简单高效的描述和实现适合GNN的图数据访问模式,以及如何与主流的深度学习框架对接,是GL重点关注的。

实际中的图由于规模原因,采样是必要阶段。总结下来,图采样大致包含几类:

  • 遍历型(Traverse),从图上获取一个batch的点或边数据;

  • 关系型(Neighborhood,Subgraph),获取点的N跳邻域或生成由点构成的子图,用于构造训练样本;

  • 负采样(Negative),与关系型相对,一般用于无监督训练场景,产生负例样本;


我们把如上几类操作抽象成了一套接口,称为 GSL。 开发者可以基于GSL描述用于GNN的图数据流,例如,针对“用户点击商品”异构图场景,“随机采样64个user,并按边的权重为每个user采样10个关联商品”, 可以写成
g.V("user").batch(64).outV("click").sample(10).by("edge_weight")

GSL考虑实际图数据的特点,覆盖了对超大图、异构图、属性图的支持,语法设计贴近Gremlin形式,便于理解。

GSL语法

我们把一个由GSL描述的语句称为Query,一个Query通常由SOURCESTEPSINK三种类型的操作组成。 其中,SOURCE为查询语句的入口,表示从哪些数据出发;STEP为查询过程中游走和采样的路径,SINK为对执行结果的封装。 在GSL中,一条Query必须包含一个SOURCE一个SINK操作。

SOURCE

V/E

SOURCE是查询语句的入口,支持 V() 和 E() 两个接口,分别表示从顶点和从边开始查询。具体如下。

def V(t, node_from=gl.NODE)
"""
Args:
    t(string): 顶点类型或边类型,后续操作会针对该类型的顶点进行;
    node_from: gl.NODE | gl.EDGE_SRC | gl.EDGE_DST,分别为对指定的顶点类型遍历,对指定边类型的源顶点遍历,对指定边类型的目的顶点遍历。
"""

def E(edge_type, reverse=False)
"""
Args:
    edge_type(string): 边类型,后续操作会针对该类型的边进行;
    reverse(boolean): 是否遍历反向边,默认为False,及遍历原始的边。
"""
  • 无向边遍历

构图时添加边,指定为无向边。

g.edge(source, (src_type, dst_type, edge_type), decoder, directed=False)

遍历时有几下几种情况:
(1)src_type和dst_type一致,如边类型为(“item”, “item”, “i2i”),当i2i为无向边时,边会增加一份dst到src的连接,同时去重。

原始i2i数据                     加载到图中的i2i数据   
item	item         ->         item	item
1	2                           1	2
2	1                           2	1
1	3                           1	3
2	3                           3	1
3	3                           2	3
1	4                           3	2
                                3	3
                                1	4
                                4	1

g.E("i2i"): 遍历double and unique的i2i数据,如上”加载到图中的i2i数据 “。
g.E("i2i", reverse=Ture): 不支持,因为这时候i2i是无向边,不区分正向和反向。

(2)src_type和dst_type不一致,如边类型为(“user”, “item”, “u2i”),当u2i为无向边时,在加载时实际上除了加载原始的u2i边之外,额外加载了一份i2u的反向边。

原始u2i数据                 加载到图中的u2i数据    +     加载到图中的i2u数据
user	item            ->      user	item                 item	user
1	2                           1	2                        2	1
2	1                           2	1                        1	2
1	3                           1	3												 3	1
2	3                           2	3												 3	2
3	3                           3	3												 3	3
1	4                           1	4												 4	1

g.E("u2i"): 遍历”加载到图中的u2i数据 “,即原始数据。
g.E("u2i", reverse=Ture): 遍历“加载到图中的i2u数据”,即反向数据。

batch

当从图获取数据时,batch()用于指定每次获取到的V()或E()的多少。若同时要对数据进行shuffle(),注意在batch()之后。当shuffle(traverse=True)时,如果剩余数据不足batch_size但不为0,则返回实际数据,此时不会触发OutOfRangeError。只有剩余数据为0时,才会OutOfRangeError

def batch(batch_size):
"""
Args:
	batch_size(int): 每次获取的源数据的多少。
"""

shuffle

batch之后可以接 shuffleshuffle() 为可选接口,当从图获取数据时,表示是否要对顶点/边进行随机获取。

def shuffle(traverse=False):
"""
Args:
  traverse(boolean): 是否要随机获取数据源,默认False。
      True表示按序遍历,当遍历到结尾时会触发OutOfRangeError,False则不会OutOfRangeError。
"""

示例

(1)顺序遍历图中的user顶点,batch size为64,当遍历完全图user顶点后,OutOfRange。

query = g.V("user").batch(64).alias('seed').values()
ds = gl.Dataset(query)
while True:
  try:
    ds.next()
  except gl.OutOfRangeError:
    break

(2)随机有放回地遍历64个user类型的顶点。永远不会OutOfRange。

query = g.V("user").batch(64).shuffle().alias('seed').value()
ds = gl.Dataset(query)
for i in range(100):
  ds.next()

(3)随机无放回地遍历64个user类型的顶点,当遍历完全图的user顶点后,OutOfRange。

query = g.V("user").batch(64).shuffle(traverse=True).alias('seed').values()
ds = gl.Dataset(query)
while True:
  try:
    ds.next()
  except gl.OutOfRangeError:
    break

通过E遍历边的方式类似。

STEP 图采样/负采样

STEP用于描述查询游走的路径和采样的方式。一条查询语句中可以包含0到多个STEP。目前,STEP包含以下两类接口:描述路径游走,描述采样方式。一个完整的STEP包含游走+采样(图采样或负采样)。

描述路径游走的接口,表示当前操作对象的转移。例如,g.V()表示当前操作对象为顶点,可以通过outE()把当前操作对象转移到顶点的出边上,后面的操作表示针对边来进行。当边的操作结束后,可以通过inV() / outV(),再将焦点转移到边对应的目的顶点或源顶点上。

(1)游走接口:

def inV():
""" 承接边(只能跟在边操作之后),表示两个顶点中箭头指向的那个,即目的顶点
"""

def outV():   
""" 承接边(只能跟在边操作之后),表示两个顶点中箭头发起的那个,即源顶点
"""

def inE(edge_type):
""" 承接顶点(只能跟在顶点操作之后),表示顶点的入边
Args:
	edge_type(string): 边的类型,当该边为有向边时, 调用inE报错
"""

def outE(edge_type):
""" 承接顶点(只能跟在顶点操作之后),表示顶点的出边
Args:
	edge_type(string): 边的类型
"""

def inV(edge_type):
""" 承接顶点(只能跟在顶点操作之后),表示顶点沿着边的上游顶点
Args:
	edge_type(string): 边的类型,当该边为有向边时, 调用inV报错
"""

def outV(edge_type):
""" 承接顶点(只能跟在顶点操作之后),表示顶点沿着边的下游顶点
Args:
	edge_type(string): 边的类型
"""

def outNeg(edge_type):
""" 承接顶点(只能跟在顶点操作之后),表示在该边类型下负采样下游顶点
Args:
	edge_type(string): 边的类型
"""
    
def inNeg(edge_type):
""" 承接顶点(只能跟在顶点操作之后),表示在该边类型下负采样上游顶点
Args:
	edge_type(string): 边的类型,当该边为有向边时, 调用inNeg报错
"""

(2)采样接口:

描述采样方式的接口,在前序路径描述的基础上,且当前操作对象为顶点时,如何采样该类型的顶点作为前序顶点的邻居,包括采样多少、采样策略。

def sample(N):
""" 承接顶点(只能跟在顶点操作之后),表示当前顶点采样多少作为前序顶点的邻域
Args:
	N(int): 采样的数量
"""

def by(strategy):
""" 紧跟sample()之后,表示采样策略
Args:
	strategy(string): 采样策略
    	如果前序路径为采样,支持的策略包括: 
          “edge_weight”, "in_degree", "topk", "random", "full";
    	如果前序路径为负采样,支持的策略包括: 
          "in_degree", "random", "node_weight"
"""

图游走

outV(edge_type)/outE(edge_type):从边的源顶点开始到目的顶点向前推进,即源顶点沿着其出边游走。 inV(edge_type)/inE(edge_type):从边的目的顶点开始到源顶点向后回溯,即目的顶点沿着入边游走。其中,edge_type对应的边必须为无向边(在构图时,必须指定directed=false,即可以反向游走,参考 图对象)。

例如,对于如下所示异构图,从user顶点开始,沿u2i边采样user的一跳邻居,再沿i2i边采样user的二跳邻居。

gsl-u-i-i

g.V("user").batch(64).alias('src')                       # (1) 获取64个user顶点
 .outV("u2i").sample(10).by("edge_weight").alias('hop1') # (2) 为上述每个user采样10个邻居item
 .outV("i2i").sample(15).by("random").alias('hop2')      # (3) 为上述每个item采样15个邻居item


若以边为数据源,可以再获取边后分别针对其端点进行操作。一般情况下,获取边往往用于无监督学习,把边看成正样本,再对边的源顶点进行负邻居采样作为负样本。

gsl-u-i-neg

g.E("u2i").batch(64).alias('edge')                    # (1) 随机获取64条u2i的边
 .ouV().alias('src')                                  # (2) 跳转到边的源顶点,即user
 .outNeg("u2i").sample(10).by("random").alias('neg')  # (3) 为上述每个user采样10个不相关的item


当边为无向边时,可以通过outV(edge_type)inV(edge_type)来实现循环采样,源点和目的点互为邻居。

gsl-i-i

g.V("user").batch(64).alias('src')                   # (1) 随机获取64个user顶点
 .outV("u2i").sample(10).by("random").alias('hop1')  # (2) 为上述每个user采样10个邻居item
 .inV("u2i").sample(5).by("random").alias('hop2')    # (3) 为上述每个item采样10个邻居user

图采样

图采样的Query接口组合如下:

# 沿出边采样邻居顶点
outV(edge_type).sample(N).by(strategy)

# 沿入边采样邻居顶点
inV(edge_type).sample(N).by(strategy)

# 沿出边采样邻居边
outE(edge_type).sample(N).by(strategy)

# 沿出边采样邻居边
inE(edge_type).sample(N).by(strategy)

GL目前已支持以下几种采样策略,对应**.by**接口的strategy参数。

strategy 说明
edge_weight 以边权重为概率采样
random 有放回随机采样
random_without_replacement 无放回随机采样,邻居数不够时,参考填充规则
topk 返回边权重topK的邻居,邻居数不够时,参考填充规则
in_degree 以顶点入度为概率采样
full 邻居数不足N时,返回全部邻居,大于N时,以N截断

填充规则:当采样要求的数据不足时,需要以某种方式填充返回结果。默认情况下,用default_neighbor_id填充补齐不足的iddefault_neighbor_id默认为0,可以通过gl.set_default_neighbor_id(xx)来设置。如要循环填充,即循环使用已有邻居id而非default_neighbor_id,则设置填充模式gl.CIRCULARgl.set_padding_mode(gl.CIRCULAR)

示例: 以下示例表示以64的batch size遍历图中的“用户“顶点,并按照“购买”边的权重为采样概率,对每个“用户”采样10个“用户”“购买”的“商品”。

g.V("user").batch(64).alias("u") \
 .outV('buy').sample(10).by("edge_weight").alias("1hop")

负采样

负采样的Query接口组合如下:

# 沿出边采样负邻居顶点(.where表示加上条件的负采样)
outNeg(edge_type).sample(expand_factor).by(strategy)[.where(target, condition={})]

# 沿入边采样负邻居顶点(.where表示加上条件的负采样)
inNeg(edge_type).sample(expand_factor).by(strategy)[.where(target, condition={})]

# 顶点表负采样
Neg(node_type).sample(expand_factor).by("node_weight")

sample接口的参数expand_factor,表示对每个顶点采样做少个负样本。 ​

GL目前已支持以下几种负采样策略,对应**.by**接口的strategy参数。

strategy 说明
random 随机负采样,不保证true-negative
in_degree 以顶点入度分布为概率进行负采样,保证true-negative
node_weight 按照顶点权重在顶点表负采样,仅适用于Neg(node_type)

where参数使用:

def where(target, condition={}):
  """ Add condition for negative samlpler. Used after `by`.

    Args:
      target (string): Alias of upstream  which is the postive sample
        that condition should match.
      condition (dict, optional): Keys are as following.
        "batch_share" (bool, optional): Whether sampled negative samples are
          shared by this batch. Defaults to False.
        "unique" (bool, optional): Whether sampled negtive samples are unique.
          Defaults to False.
        "int_cols" (int list, optional): int columns as condition.
          Defaults to []. 表示在这些指定的属性下进行负采样。
          比如target的int属性有3个,int_cols=[0,1]表示,在第一个int属性和target的第1个
          int属性一样的节点,以及第2个int属性和target的第2个属性一样的节点里选取负样本。
        "int_props" (float list, optional) : proportions of int columns.
          Defaults to []. 表示int_cols里每个属性采样的比例。
          比如int_cols=[0,1],int_props=[0.1,0.2],表示在和target的第1个int属性一样的点
          里采样expand_factor*0.1个负样本,在和dst_ids的
        第2个int属性一样的点里采样expand_factor*0.2个负样本。
        "float_cols" (int list, optional): float columns as condition.
          Defaults to [].
        "float_props" (float list, optional): proportions of float columns.
          Defaults to [].
        "str_cols" (int list, optional): string columns as condition.
          Defaults to [].
        "str_props" (float list, optional): proportions of string columns.
          Defaults to [].
    """

Note:

  • Decoder中attr_types描述为(“string”, Size)的属性,在condition中看做int属性。

  • int_props, float_props, str_props中的每一项必须为float类型,即如[1]需要写成[1.0]

示例:

  • 以下示例表示以64的batch size遍历图中的“用户“顶点,并对每个“用户”随机采样10个“用户”“未购买”的“商品”。

g.V("user").batch(64).alias("u") \
 .outNeg('buy').sample(10).by("random").alias("1hop") \
 .values()
  • 以下示例表示以64的batch size遍历图中的“购买“边,并对边上每个“用户”按条件采样10个“用户”“未购买”的“商品”,其中“商品”需要满足的条件是,和“用户”的第1个int属性相同的“商品”采样100.25个,和“用户”的第2个int属性相同的“商品”采样100.25个,和“用户”的第1个str属性相同的“商品”采样10*0.5个:

g.E("buy").batch(64).alias("e").each(
  lambda e: (
             e.inV().alias('dst'),
             e.outV().alias('src') \
              .outNeg("buy").sample(10).by('random').where(
                   "dst",
                   condition={
                     "int_cols": [0,1], "int_props": [0.25,0.25],
                     "str_cols": [0], "str_props": [0.5]}).alias('neg'))) \
              .values()

多路下游

each

一个典型的GNN算法对图数据的需求往往是多路的。each()接口用于表达Query的多分枝,当进行到某个阶段后,针对前序结果(可能有多个)分别进行不同的操作。

def each(func):
"""
Args:
	func(lambda): 构造子query的lambda表达式,表达式的输入为紧跟上游操作的输出。
Return:
	Query对象
"""

例如,在二部图(user-item-item)GraphSAGE算法中,我们为了学习user和item在同一空间下的相似性,首先会随机获取图的u2i边作为训练的正样本,再对边的源点(即user顶点)采样其负邻居(item顶点)产生负样本。user通过一阶邻居进行编码(把邻域信息聚合到中心节点),即需要采样其一跳邻居。item也需要一阶邻居来编码。图数据的访问模式如下所示。

gsl-u-i-i-neg

query = g.E("u2i").batch(512).shuffle().alis('edge')       # (1) 随机获取512条u2i的边
 .each(lambda edge:                                        # 上游输出的是边
    (edge.outV().alias('src').each(
      lambda src:
        (src.outV("u2i").sample(10).by("edge_weight").alias('src_1hop'), # (2) 为边的源顶点user采样10个邻居
         src.outNeg("u2i").sample(5).by("in_degree").alias('src_neg') \ # (4) 为边的源顶点user负采样5个邻居
            .outV("i2i").sample(10).by("random").alias('src_neg_1hop')), # (5) 为上述负采样的每个item采样其10个邻居
      edge.inV().alias('dst').outV("i2i").sample(10).by("random").alias('dst_1hop') # (3) 为边的目的顶点item采样10个邻居
    ).values()

变量

each接口用lambda function对上游进行多路下游操作。也可以将上游赋值给一个变量,用该变量接多路下游。

Note: 从同一个SOURCE游走的任意STEP,后面接**.values()**都表示这个Query描述完成。从同一个SOURCE游走的任意STEPs只能接一次vlaues(). 如下示例的第19行,edge.values()和src.values() / dst.values() 等价。

# 采样边
edge = g.E("u2i").batch(512).shuffle().alias(...)

# 获取边的源顶点
src = edge.outV().alias('src')
# 获取边的目的顶点
dst = edge.inV().alias('dst')

# 给源顶点采样邻居
src.outV("u2i").sample(10).by("edge_weight").alias('src_1hop')

# 给源顶点做负采样
src.outNeg("u2i").sample(5).by("in_degree").alias('src_neg')
   .outV("i2i").sample(10).by("random").alias('src_neg_1hop'))

# 给目的顶点采样邻居
dst.outV("i2i").sample(10).by("random").alias('dst_1hop')

query = edge.values()

命名

alias

每一个SOURCE或STEP都需要由alias命名,便于后续对输出结果的访问。

def alias(name):
""" 
Args:
	name(string): 给操作起一个别名。
Return:
	Query对象
"""

SINK

values

一句Query描述完Source、Step后,需要以.values()结尾,表示Query描述完整。Query中的任意一个节点.values()都表示该query结束。如:

query1 = g.E("u2i").batch(512).shuffle().alias("edges")
         .values()

src = g.V("user").batch(64).alias("src")
dst = src.outV("u2i").sample(5).by("random").alias('dst')
query2 = src.values() # 等价于query2 = dst.values()

GSL执行结果

GSL表达的Query以遍历图中的顶点或边开始,因此可以循环地执行。 GSL执行的结果是Query中描述的所有游走的顶点或边构成的对象,他们包含了自身的id属性、标签、权重。 ​

Dataset

graphlearn.Dataset接口,用于将Query的结果构造为Numpy组成的graphlearn.Nodes/graphlearn.Edgesgraphlearn.SparseNodes/graphlearn.Edges对象的生成器。Nodes等对象描述详见文档 数据对象

class Dataset:
    def __init__(self, qeury, window=10):
    """
    query: GSL query, 从g.V()/g.E()开始,以.values()结束
    window: Dataset会进行异步地预取数据,window为预取数据的大小。
    """
    
    def next(self):
    """
    返回一个dict,其中,key为Dataset对应的GSL query中的每一个alias,
    value为alias对应的Nodes或Edges的数据。
    *V()对应Nodes,*E()对应Edges。
    特殊地,full采样对应的输出为SparseNodes或SparseEdges。
    """

使用方式如下:

import graphlearn as gl
query = g.V("user").batch(64).alias('u') \
         .outV('buy').sample(10).by('random').alias('i') \
         .outE('i2i').sample(5).by('random').alias('i-i')
         .values()

ds = gl.Dataset(query)
while True:
  try:
    res = ds.next()
  except gl.OutOfRangeError:
    break

上例中,Dataset.next()的输出如下。

res = ds.next()
# { "u": Nodes, 'i': Nodes, 'i-i': Edges}

res["u"].ids
# Int64 np array, shape: [64]([batch_size])
res["u"].int_attrs
# Int64 np array, shape: [64, 2]([batch_size, int_attr_num]) or None
res["u"].float_attrs
# Float np array, shape: [64, 3]([batch_size, float_attr_num]) or None
res["u"].string_attrs
# String np array, shape: [64, 1]([batch_size, string_attr_num]) or None
res["u"].weights
# Float np array, shape: [64]([batch_size]) or None
res["i"].labels
# Int32 np array, shape: [64]([batch_size]) or None

res["u"].ids
# Int64 np array, shape: [64, 10]([batch_size, nb_count])
res["u"].int_attrs
# Int64 np array, shape: [64, 10, 2]([batch_size, nb_count, int_attr_num]) or None
res["u"].float_attrs
# Float np array, shape: [64, 10, 5]([batch_size, nb_count, float_attr_num]) or None
res["u"].string_attrs
# String np array, shape: [64, 10, 3]([batch_size, nb_count, string_attr_num]) or None
res["u"].weights
# Float np array, shape: [64, 10]([batch_size, nb_count])
res["i"].labels
# Int32 np array, shape: [64, 10]([batch_size, nb_count])

res["i-i"].src_ids
# Int64 np array, shape: [64 * 10, 5]([batch_size, nbr_count])
res["i-i"].dst_ids
# Int64 np array, shape: [64 * 10, 5]([batch_size, nb_count])
res["i-i"].int_attrs
# Int64 np array, shape: [64 * 10, 5, 1]([batch_size, nb_count, int_attr_num]) or None
res["i-i"].float_attrs
# Float np array, shape: [64 * 10, 5, 1]([batch_size, nb_count, float_attr_num]) or None
res["i-i"].string_attrs
# String np array, shape: [64 * 10, 5, 2]([batch_size, nb_count, string_attr_num]) or None
res["i-i"].weights
# Float np array, shape: [64 * 10, 5]([batch_size, nb_count])
res["i-i"].labels
# Int32 np array, shape: [64 * 10, 5]([batch_size, nb_count])

示例

  • 数据准备

vim gen_test_data.py

gen_test_data.py

def gen_files():
    import random

    u_count = 100
    i_count = 10

    with open("data/user", 'w') as f:
        s = 'id:int64\tweight:float\n'
        f.write(s)
        for i in range(u_count):
            s = '%d\t%f\n' % (i, i / 10.0)
            f.write(s)

    with open("data/item", 'w') as f:
        s = 'id:int64\tfeature:string\n'
        f.write(s)
        for i in range(100, 100 + i_count):
            s = '%d\t%s:%d:%f:%f:%s\n' % (i, str(i) + 's', i, i*1.0, i * 10.0, 'x')
            f.write(s)

    with open("data/u-i", 'w') as f:
        s = 'src_id:int64\tdst_id:int64\tweight:float\n'
        f.write(s)
        for i in range(u_count):
            for j in range(100, 100 + i_count):
                s = '%d\t%d\t%f\n' % (i, j, (i + j) * 0.1)
                f.write(s)

gen_files()

执行脚本

mkdir -p data
python gen_test_data.py
  • **图对象构建和GSL执行 **本示例中“buy”边为无向边。

vim test_local.py

test_local.py

import os
import sys

import graphlearn as gl

def test_node_iterate(graph):
  """Iterate users, sample 2 hops with path user-(buy)-item-(buy_reverse)-user.
  """
  query = graph.V("user").batch(32).shuffle(traverse=True).alias("src") \
          .outV("buy").sample(5).by("edge_weight").alias("hop1") \
          .inE("buy").sample(2).by("random").alias("hop1-hop2") \
          .inV().alias("hop2") \
          .values()

  ds = gl.Dataset(query)
  epoch = 2
  for i in range(epoch):
    step = 0
    while True:
      try:
        res = ds.next()
        step += 1

        src_nodes = res["src"]
        hop1_nodes = res["hop1"]
        hop1_hop2_edges = res["hop1-hop2"]
        hop2_nodes = res["hop2"]

        assert isinstance(src_nodes, gl.Nodes)
        assert isinstance(hop1_nodes, gl.Nodes)
        assert isinstance(hop1_hop2_edges, gl.Edges)
        assert isinstance(hop2_nodes, gl.Nodes)

        assert src_nodes.type == "user"
        assert hop1_nodes.type == "item"
        assert hop1_hop2_edges.type == ("item", "user", "buy_reverse")
        assert hop1_hop2_edges.edge_type == "buy_reverse"
        assert hop2_nodes.type == "user"

        if  step == 100 // 32 + 1:  # total user nodes count is 100
          assert tuple(src_nodes.ids.shape) == (100 % 32, )
          assert tuple(hop1_nodes.ids.shape) == (100 % 32, 5)
          assert tuple(hop1_hop2_edges.src_ids.shape) == (100 % 32 * 5, 2)
          assert tuple(hop1_hop2_edges.dst_ids.shape) == (100 % 32 * 5, 2)
          assert tuple(hop2_nodes.ids.shape) == (100 % 32 * 5, 2)

          assert tuple(hop1_nodes.float_attrs.shape) == (100 % 32, 5, 2)  # 2 float attrs
          assert tuple(hop1_hop2_edges.weights.shape) == (100 % 32 * 5, 2)
          assert tuple(hop2_nodes.weights.shape) == (100 % 32 * 5, 2)
        else:
          assert tuple(src_nodes.ids.shape) == (32, )
          assert tuple(hop1_nodes.ids.shape) == (32, 5)
          assert tuple(hop1_hop2_edges.src_ids.shape) == (32 * 5, 2)
          assert tuple(hop1_hop2_edges.dst_ids.shape) == (32 * 5, 2)
          assert tuple(hop2_nodes.ids.shape) == (32 * 5, 2)

          assert tuple(hop1_nodes.float_attrs.shape) == (32, 5, 2)  # 2 float attrs
          assert tuple(hop1_hop2_edges.weights.shape) == (32 * 5, 2)
          assert tuple(hop2_nodes.weights.shape) == (32 * 5, 2)
      except gl.OutOfRangeError:
        break

def test_edge_iterate(graph):
  """Iterate buy edges, sample hops of src and dst nodes.
    user-(buy)-item   (1) iterate edges
      |         |
    (buy)  (buy_reverse)
      |         |
    item       user   (2) sample neighbors of src and dst nodes. `
  """
  edges = graph.E("buy").batch(32).shuffle(traverse=True).alias("edges")
  src = edges.outV().alias("src")
  dst = edges.inV().alias("dst")
  neg = src.outNeg("buy").sample(2).by("in_degree").alias("neg")

  neg.inV("buy").sample(4).by("random").alias("neg_hop1")
  src.outE("buy").sample(5).by("random").alias("src_hop1_edges") \
     .inV().alias("src_hop1")
  dst.inE("buy").sample(3).by("edge_weight").alias("dst_hop1_edges") \
      .inV().alias("dst_hop1")

  query = edges.values()
  ds = gl.Dataset(query)
  epoch = 2
  for i in range(epoch):
    step = 0
    while True:
      try:
        res = ds.next()
        step += 1

        edges = res["edges"]
        src_nodes = res["src"]
        dst_nodes = res["dst"]
        neg_nodes = res["neg"]
        src_hop1_edges = res["src_hop1_edges"]
        src_hop1_nodes = res["src_hop1"]
        neg_hop1_nodes = res["neg_hop1"]
        dst_hop1_edges = res["dst_hop1_edges"]
        dst_hop1_nodes = res["dst_hop1"]

        assert edges.type == ("user", "item", "buy")
        assert src_nodes.type == "user"
        assert dst_nodes.type == "item"
        assert neg_nodes.type == "item"
        assert src_hop1_edges.type == ("user", "item", "buy")
        assert src_hop1_nodes.type == "item"
        assert neg_hop1_nodes.type == "user"
        assert dst_hop1_edges.type == ("item", "user", "buy_reverse")
        assert dst_hop1_nodes.type == "user"

        if step == 1000 // 32 + 1:  # total buy edges count is 1000
          assert tuple(neg_nodes.float_attrs.shape) == (1000 % 32, 2, 2)
          assert tuple(neg_hop1_nodes.weights.shape) == (1000 % 32 * 2, 4, )
          assert tuple(src_hop1_edges.weights.shape) == (1000 % 32, 5)
          assert tuple(src_hop1_nodes.float_attrs.shape) == (1000 % 32, 5, 2)
          assert tuple(dst_hop1_edges.weights.shape) == (1000 % 32, 3)
          assert tuple(dst_hop1_nodes.weights.shape) == (1000 % 32, 3)
        else:
          assert tuple(neg_nodes.float_attrs.shape) == (32, 2, 2)
          assert tuple(neg_hop1_nodes.weights.shape) == (32 * 2, 4, )
          assert tuple(src_hop1_edges.weights.shape) == (32, 5)
          assert tuple(src_hop1_nodes.float_attrs.shape) == (32, 5, 2)
          assert tuple(dst_hop1_edges.weights.shape) == (32, 3)
          assert tuple(dst_hop1_nodes.weights.shape) == (32, 3)

        src_ids = list(dst_hop1_edges.src_ids.flatten())
        dst_ids = list(dst_hop1_edges.dst_ids.flatten())
        weights = list(dst_hop1_edges.weights.flatten())
        for src_id, dst_id, weight in zip(src_ids, dst_ids, weights):
          assert abs(0.1 * (src_id + dst_id) - weight) < 10 ** -6

        src_ids = list(src_hop1_edges.src_ids.flatten())
        dst_ids = list(src_hop1_edges.dst_ids.flatten())
        weights = list(src_hop1_edges.weights.flatten())
        for src_id, dst_id, weight in zip(src_ids, dst_ids, weights):
          assert abs(0.1 * (src_id + dst_id) - weight) < 10 ** -6

      except gl.OutOfRangeError:
        break


def main(argv):
  cur_path = sys.path[0]

  # Step 1: Construct graph with data source.
  #   Edges:
  #     user<--(buy)-->item
  g = gl.Graph()
  g.node(os.path.join(cur_path, "data/user"),
         node_type="user", decoder=gl.Decoder(weighted=True)) \
    .node(os.path.join(cur_path, "data/item"),
          node_type="item", decoder=gl.Decoder(attr_types=['string', 'int', 'float', 'float', 'string'])) \
    .edge(os.path.join(cur_path, "data/u-i"),
          edge_type=("user", "item", "buy"), decoder=gl.Decoder(weighted=True), directed=False) \
    .init()

  # Step 2: Describe the queries on graph.
  test_node_iterate(g)
  test_edge_iterate(g)

  g.close()


if __name__ == "__main__":
  main(sys.argv[1:])

执行脚本

>>> python test_local.py

详细的示例见代码: query_examples.py