Intro
Multi Devices, Single Machine
- Check if GPU cards have nVidia Compute Capability >3.0
- Alternative using AWS: helpful blog post
- Google Cloud service: uses TPU hardware
- Which to use? (Tim Dettmers)
- Download CUDA & CuDNN, set their environment vars
- use nvidie-smi cmnd to check installation
- install TF with GPU support
open Python shell, verify TF detects CUDA & cuDNN
import tensorflow as tf
sess = tf.Session()
import tensorflow as tf
config = tf.ConfigProto()
#config.gpu_options.per_process_gpu_memory_fraction=0.4
config
Managing GPU RAM
- TF grabs all GPU RAM on first graph invocation. To run 2nd TF program while the 1st is still running, run each process on different GPU cards. (Below: program #1 sees GPUs 0,1; program #2 sees GPUs 2,3.)
$ CUDA_VISIBLE_DEVICES=0,1 python3 program_1.py
$ CUDA_VISIBLE_DEVICES=3,2 python3 program_2.py
- Option 2: tell TF to grab a % of memory. (Below: 40% allocation.)
session = tf.Session(config=config)
config,session
Placing Ops on Devices
Parallel Execution
- TF Whitepaper - dynamic algorithm, distributes ops across all available devices. But not available (yet) in open-source TF.
Simple Placement
- Mostly up to you. To pin devices to specific device, use a device() function. Below: a,b pinned to cpu#0; c can go anywhere.
import tensorflow as tf
with tf.device("/cpu:0"):
a,b = tf.Variable(3.0), tf.Variable(4.0)
c = a*b
c
Logging Placements
- Use log_device_placement=True. This tells placer to log msg whenever a node is "placed".
import tensorflow as tf
config = tf.ConfigProto()
config.log_device_placement = True
sess = tf.Session(config=config)
print(config,"\n",sess)
Dynamic Placement
- You can specify a function instead of a device when creating a device block.
import tensorflow as tf
def variables_on_cpu(op):
if op.type == "Variable":
return "/cpu:0"
else:
return "/cpu:0"
with tf.device(variables_on_cpu):
a = tf.Variable(3.0)
b = tf.constant(4.0)
c = a * b
c
Ops & Kernels
- TF operations need to define a kernel to run n a device. Not all ops have kernels for both GPUs and CPUs. Example: TF doesn't have integer kernel for GPUs. Changin i (below) from 3 to 3.0 should allow op to run.
import tensorflow as tf
with tf.device("/gpu:0"):
i = tf.Variable(3)
test = sess.run(i.initializer)
test
- To allow TF to "fall back" to a CPU instead, use allow_soft_placement=True.
with tf.device("/gpu:0"):
i = tf.Variable(3)
config = tf.ConfigProto()
config.allow_soft_placement = True
sess = tf.Session(config=config)
test = sess.run(i.initializer) # the placer runs and falls back to /cpu:0
print(test)
Parallel Execution
- TF executes any nodes with zero dependencies first. If those nodes are on separate devices, they are run in parallel. If on the same device, they are run in different threads & may be run in parallel.
Control Dependencies
- Use control dependencies to control/postpone node evaluations (ex: premature memory hogging).
import tensorflow as tf
a = tf.constant(1.0)
b = a + 2.0
with tf.control_dependencies([a,b]):
x = tf.constant(3.0)
y = tf.constant(4.0)
print(x+y)
Multiple Devices - Multiple Servers
- cluster: >=1 TF servers ("tasks") across machines. Tasks belong to jobs (collections of related tasks)
- "ps" = parameter server
- "worker" = computing engine
cluster_spec = tf.train.ClusterSpec({
"ps": [
"machine-a.example.com:2221", # /job:ps/task:0
],
"worker": [
"machine-a.example.com:2222", # /job:worker/task:0
"machine-b.example.com:2222", # /job:worker/task:1
]})
cluster_spec
server.join()
# blocks main thread until server stops (i.e., never)
Opening a Session
# NOT YET WORKING
# open session
#a = tf.constant(1.0)
#b = a + 2
#c = a * 3
#with tf.Session("grpc://machine-b.example.com:2222") as sess:
# print(c.eval()) # 9.0
Master & Worker Services
- gRPC protocol to talk to servers. HTTP2 basis, bidirectional
- based on protocol buffers
- all servers can provide master & worker services.
Pinning Ops Across Tasks
- you can pin ops to any device
- ex:
# NOT WORKING YET
#with tf.device("/job:ps/task:0/cpu:0")
#a = tf.constant(1.0)
#with tf.device("/job:worker/task:0/cpu:0")
#with tf.device("/job:worker/task:0/gpu:1")
#b = a + 2
#c = a + b
Sharding Variables across Multiple Param Servers
- sharding across servers mitigates risk of network card saturation
- TF distribs variables across all "ps" tasks - round robin setup
'''NOT WORKING YET
import tensorflow as tf
with tf.device(tf.train.replica_device_setter(ps_tasks=2):
v1 = tf.Variable(1.0) # pinned to /job:ps/task:0
v2 = tf.Variable(2.0) # pinned to /job:ps/task:1
v3 = tf.Variable(3.0) # pinned to /job:ps/task:0
v4 = tf.Variable(4.0) # pinned to /job:ps/task:1
v5 = tf.Variable(5.0) # pinned to /job:ps/task:0
'''
'NOT WORKING YET\nimport tensorflow as tf\nwith tf.device(tf.train.replica_device_setter(ps_tasks=2):\n v1 = tf.Variable(1.0) # pinned to /job:ps/task:0\n v2 = tf.Variable(2.0) # pinned to /job:ps/task:1\n v3 = tf.Variable(3.0) # pinned to /job:ps/task:0\n v4 = tf.Variable(4.0) # pinned to /job:ps/task:1\n v5 = tf.Variable(5.0) # pinned to /job:ps/task:0\n'
Sharing State across Sessions (Resource Containers)
- local session: all vars managed by session itself & vanish on end.
- distributed session: vars managed by resource containers on cluster
'''# simple_client.py
#import tensorflow as tf
#import sys
#x = tf.Variable(0.0, name="x")
#increment_x = tf.assign(x, x + 1)
#with tf.Session(sys.argv[1]) as sess:
# if sys.argv[2:]==["init"]:
#sess.run(x.initializer)
#sess.run(increment_x)
#print(x.eval())
'''
'# simple_client.py\n#import tensorflow as tf\n#import sys\n#x = tf.Variable(0.0, name="x")\n#increment_x = tf.assign(x, x + 1)\n#with tf.Session(sys.argv[1]) as sess:\n# if sys.argv[2:]==["init"]:\n#sess.run(x.initializer)\n#sess.run(increment_x)\n#print(x.eval())\n'
# launches client which connects to B, reuses variable x
# python3 simple_client.py grpc://machine-b.example.com:2222
#2.0
Async Communications (TF Queues)
- Queueing data
- DeQueueing data
- Queues of tuples
- Closing a queue
- RandomShuffleQueue
- PaddingFifoQueue
Loading Data Directly from Graph
- Needed to avoid file server (bandwidth) saturation
- Preloading data to variables
Reading data from graph with reader operations
- CSV, binary, TFRecords
- TextLineReader reads file lines one-by-one
- record identifier (string): filename:linenumber
- tf.decode_csv(val, record_defaults=[...])
'''TO LOAD A GRAPH
instance_queue = tf.RandomShuffleQueue(
capacity=10,
min_after_dequeue=2,
dtypes=[tf.float32, tf.int32],
shapes=[[2],[]],
name="instance_q",
shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()
'''
'TO LOAD A GRAPH\ninstance_queue = tf.RandomShuffleQueue(\n capacity=10, \n min_after_dequeue=2,\n dtypes=[tf.float32, tf.int32], \n shapes=[[2],[]],\n name="instance_q", \n shared_name="shared_instance_q")\n\nenqueue_instance = instance_queue.enqueue([features, target])\nclose_instance_queue = instance_queue.close()\n'
'''TO RUN THE GRAPH
with tf.Session([...]) as sess:
sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
sess.run(close_filename_queue)
try:
while True:
sess.run(enqueue_instance)
except tf.errors.OutOfRangeError as ex:
pass # no more records in the current file and no more files to read
sess.run(close_instance_queue)
'''
'TO RUN THE GRAPH\nwith tf.Session([...]) as sess:\n sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})\n sess.run(close_filename_queue)\n try:\n while True:\n sess.run(enqueue_instance)\n except tf.errors.OutOfRangeError as ex:\n pass # no more records in the current file and no more files to read\n sess.run(close_instance_queue)\n'
Multithreaded readers using a Coordinator & QueueRunner
Other convenience functions
- string_input_producer()
- tf.train.start_queue_runners()
producer functions = create queues
- input_producer()
- range_input_producer()
- slice_input_procucer()
shuffle_batch(list_of_tensors)
- returns RandomShuffleQueue
- returns QueueRunner (added to GraphKeys.QUEUE_RUNNERS)
dequeue_many() = returns minibatch from queue
batch() --?
- batch_join() --?
- shuffle_batch_join() --?
One NN per Device
near-linear speedup: training 100 nets across 50 servers x 2 gpus/server roughly equiv to 1 net on 1 gpu. (perfect for hyperparamer tuning)
potential option: tf serving, released 2/2016
In-Graph vs Between-Graph Replication (for Ensembles)
- Two approaches to building ensembles:
1) one big graph, one session, any server in cluster ("in graph replication")
2) one graph/network, handle synchronization yourself ("between graph replication") using queues -- considered more flexible
#RunOptions ... timeout_in_ms()
'''NOT YET
with tf.Session([...]) as sess:
[...]
run_options = tf.RunOptions()
run_options.timeout_in_ms = 1000 # 1s timeout
try:
pred = sess.run(dequeue_prediction, options=run_options)
except tf.errors.DeadlineExceededError as ex:
[...] # the dequeue operation timed out after 1s
'''
'NOT YET\nwith tf.Session([...]) as sess:\n [...]\n run_options = tf.RunOptions()\n run_options.timeout_in_ms = 1000 # 1s timeout\n try:\n pred = sess.run(dequeue_prediction, options=run_options)\n except tf.errors.DeadlineExceededError as ex:\n [...] # the dequeue operation timed out after 1s\n'
#
'''NOT YET
config = tf.ConfigProto()
config.operation_timeout_in_ms = 1000
# 1s timeout for every operation
with tf.Session([...], config=config) as sess:
[...]
try:
pred = sess.run(dequeue_prediction)
except tf.errors.DeadlineExceededError as ex:
[...] # the dequeue operation timed out after 1s
'''
'NOT YET\nconfig = tf.ConfigProto()\nconfig.operation_timeout_in_ms = 1000\n# 1s timeout for every operation\nwith tf.Session([...], config=config) as sess:\n [...]\n try:\n pred = sess.run(dequeue_prediction)\n except tf.errors.DeadlineExceededError as ex:\n [...] # the dequeue operation timed out after 1s\n'
Model Parallelism
- Chopping models, running chunks on different devices
- Fully Connect Nets (FCNs): not much value in doing this
Vertical & Horiz slicing don't work well either
Nets w/ partially connected layers (CNNs): easier to distribute
- Some RNNs use mem cells (input from own output at t+1)
Data Parallelism
- Sync updates (aggregator waits for all gradients to be available, finds avg, applies result) - could be delayed by slow devices; params could also saturate server bandwidth
- Async updates - more training steps/minute. issue: "stale gradients" (when computing gradients falls behind rate of parameter change) - slows convergence, introduces noise/wobble. To avoid this:
- reduce learning rate
- drop/scaleback stale gradients
- adjust minibatch size
- Start first few epochs with just one replica ("warmup phase")
- Bandwidth - At some point, more GPUs doesn't help because network saturation won't allow more data traffic. google report. Steps you can take:
- group gpus on single server (avoids network hops)
- shard params acrosss servers
- drop precision from float32 to bfloat16
- 8b precision ("quantization"): see mobile phone apps
- How TF does it -
- you choose 1) replication type (in-graph, between-graph) and 2) update type (async or sync) 1) in-graph + sync: one big graph 2) in-graph + async: 1 optimizer/replica, 1 thread/replica 3) bw-graph + sync: wrap optimizer in SyncReplicasOptimizer