定义并训练图以进行异步更新
如前所述,并在此处的图中显示,在异步更新中,所有工作任务在准备就绪时发送参数更新,参数服务器更新参数并发回参数。参数更新没有同步或等待或聚合:
The full code for this example is in ch-15_mnist_dist_async.py
. You are encouraged to modify and explore the code with your own datasets.
对于异步更新,将使用以下步骤创建和训练图:
- 图的定义在
with
块内完成:
with tf.device(device_func):
- 使用内置的 TensorFlow 函数创建全局步骤变量:
global_step = tf.train.get_or_create_global_step()
- 此变量也可以定义为:
tf.Variable(0,name='global_step',trainable=False)
- 像往常一样定义数据集,参数和超参数:
x_test = mnist.test.images
y_test = mnist.test.labels
n_outputs = 10 # 0-9 digits
n_inputs = 784 # total pixels
learning_rate = 0.01
n_epochs = 50
batch_size = 100
n_batches = int(mnist.train.num_examples/batch_size)
n_epochs_print=10
- 像往常一样定义占位符,权重,偏差,logits,交叉熵,损失操作,训练操作,准确率:
# input images
x_p = tf.placeholder(dtype=tf.float32,
name='x_p',
shape=[None, n_inputs])
# target output
y_p = tf.placeholder(dtype=tf.float32,
name='y_p',
shape=[None, n_outputs])
w = tf.Variable(tf.random_normal([n_inputs, n_outputs],
name='w'
)
)
b = tf.Variable(tf.random_normal([n_outputs],
name='b'
)
)
logits = tf.matmul(x_p,w) + b
entropy_op = tf.nn.softmax_cross_entropy_with_logits(labels=y_p,
logits=logits
)
loss_op = tf.reduce_mean(entropy_op)
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
train_op = optimizer.minimize(loss_op,global_step=global_step)
correct_pred = tf.equal(tf.argmax(logits, 1), tf.argmax(y_p, 1))
accuracy_op = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
当我们学习如何构建同步更新时,这些定义将会改变。
- TensorFlow 提供了一个主管类,可以帮助创建训练会话,在分布式训练设置中非常有用。创建一个 supervisor 对象,如下所示:
init_op = tf.global_variables_initializer
sv = tf.train.Supervisor(is_chief=is_chief,
init_op = init_op(),
global_step=global_step)
- 使用 supervisor 对象创建会话并像往常一样在此会话块下运行训练:
with sv.prepare_or_wait_for_session(server.target) as mts:
lstep = 0
for epoch in range(n_epochs):
for batch in range(n_batches):
x_batch, y_batch = mnist.train.next_batch(batch_size)
feed_dict={x_p:x_batch,y_p:y_batch}
_,loss,gstep=mts.run([train_op,loss_op,global_step],
feed_dict=feed_dict)
lstep +=1
if (epoch+1)%n_epochs_print==0:
print('worker={},epoch={},global_step={}, \
local_step={},loss={}'.
format(FLAGS.task_index,epoch,gstep,lstep,loss))
feed_dict={x_p:x_test,y_p:y_test}
accuracy = mts.run(accuracy_op, feed_dict=feed_dict)
print('worker={}, final accuracy = {}'
.format(FLAGS.task_index,accuracy))
在启动参数服务器时,我们得到以下输出:
$ python3 ch-15_mnist_dist_async.py --job_name='ps' --task_index=**0**
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Found device 0 with properties:
name: Quadro P5000 major: 6 minor: 1 memoryClockRate(GHz): 1.506
pciBusID: 0000:01:00.0
totalMemory: 15.89GiB freeMemory: 15.79GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: Quadro P5000, pci bus id: 0000:01:00.0, compute capability: 6.1)
E1213 16:50:14.023235178 27224 ev_epoll1_linux.c:1051] grpc epoll fd: 23
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:9001}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:9002, 1 -> localhost:9003, 2 -> localhost:9004}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:9001
在启动工作任务时,我们得到以下三个输出:
工作器 1 的输出:
$ python3 ch-15_mnist_dist_async.py --job_name='worker' --task_index=**0**
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Found device 0 with properties:
name: Quadro P5000 major: 6 minor: 1 memoryClockRate(GHz): 1.506
pciBusID: 0000:01:00.0
totalMemory: 15.89GiB freeMemory: 9.16GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: Quadro P5000, pci bus id: 0000:01:00.0, compute capability: 6.1)
E1213 16:50:37.516609689 27507 ev_epoll1_linux.c:1051] grpc epoll fd: 23
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:9001}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:9002, 1 -> localhost:9003, 2 -> localhost:9004}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:9002
I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 1421824c3df413b5 with config: gpu_options { per_process_gpu_memory_fraction: 0.2 } allow_soft_placement: true
worker=0,epoch=9,global_step=10896, local_step=5500, loss = 1.2575616836547852
worker=0,epoch=19,global_step=22453, local_step=11000, loss = 0.7158586382865906
worker=0,epoch=29,global_step=39019, local_step=16500, loss = 0.43712112307548523
worker=0,epoch=39,global_step=55513, local_step=22000, loss = 0.3935799300670624
worker=0,epoch=49,global_step=72002, local_step=27500, loss = 0.3877961337566376
worker=0, final accuracy = 0.8865000009536743
工作器 2 的输出:
$ python3 ch-15_mnist_dist_async.py --job_name='worker' --task_index=**1**
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Found device 0 with properties:
name: Quadro P5000 major: 6 minor: 1 memoryClockRate(GHz): 1.506
pciBusID: 0000:01:00.0
totalMemory: 15.89GiB freeMemory: 12.43GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: Quadro P5000, pci bus id: 0000:01:00.0, compute capability: 6.1)
E1213 16:50:36.684334877 27461 ev_epoll1_linux.c:1051] grpc epoll fd: 23
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:9001}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:9002, 1 -> localhost:9003, 2 -> localhost:9004}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:9003
I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 2bd8a136213a1fce with config: gpu_options { per_process_gpu_memory_fraction: 0.2 } allow_soft_placement: true
worker=1,epoch=9,global_step=11085, local_step=5500, loss = 0.6955764889717102
worker=1,epoch=19,global_step=22728, local_step=11000, loss = 0.5891970992088318
worker=1,epoch=29,global_step=39074, local_step=16500, loss = 0.4183048903942108
worker=1,epoch=39,global_step=55599, local_step=22000, loss = 0.32243454456329346
worker=1,epoch=49,global_step=72105, local_step=27500, loss = 0.5384714007377625
worker=1, final accuracy = 0.8866000175476074
工作器 3 的输出:
$ python3 ch-15_mnist_dist_async.py --job_name='worker' --task_index=**2**
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1030] Found device 0 with properties:
name: Quadro P5000 major: 6 minor: 1 memoryClockRate(GHz): 1.506
pciBusID: 0000:01:00.0
totalMemory: 15.89GiB freeMemory: 15.70GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:1120] Creating TensorFlow device (/device:GPU:0) -> (device: 0, name: Quadro P5000, pci bus id: 0000:01:00.0, compute capability: 6.1)
E1213 16:50:35.568349791 27449 ev_epoll1_linux.c:1051] grpc epoll fd: 23
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:9001}
I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:9002, 1 -> localhost:9003, 2 -> localhost:9004}
I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://The full code for this example is in ch-15_mnist_dist_sync.py. You are encouraged to modify and explore the code with your own datasets.localhost:9004
I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session cb0749c9f5fc163e with config: gpu_options { per_process_gpu_memory_fraction: 0.2 } allow_soft_placement: true
I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session 55bf9a2b9718a571 with config: gpu_options { per_process_gpu_memory_fraction: 0.2 } allow_soft_placement: true
worker=2,epoch=9,global_step=37367, local_step=5500, loss = 0.8077645301818848
worker=2,epoch=19,global_step=53859, local_step=11000, loss = 0.26333487033843994
worker=2,epoch=29,global_step=70299, local_step=16500, loss = 0.6506651043891907
worker=2,epoch=39,global_step=76999, local_step=22000, loss = 0.20321622490882874
worker=2,epoch=49,global_step=82499, local_step=27500, loss = 0.4170967936515808
worker=2, final accuracy = 0.8894000053405762
我们打印了全球步骤和本地步骤。全局步骤表示所有工作器任务的步数,而本地步骤是该工作器任务中的计数,这就是为什么本地任务计数高达 27,500 并且每个工作器的每个周期都相同,但是因为工作器正在做按照自己的步伐采取全球性措施,全球步骤的数量在周期或工作器之间没有对称性或模式。此外,我们发现每个工作器的最终准确率是不同的,因为每个工作器在不同的时间执行最终的准确性,当时有不同的参数。