Intro

Multi Devices, Single Machine

  • Check if GPU cards have nVidia Compute Capability >3.0
  • Alternative using AWS: helpful blog post
  • Google Cloud service: uses TPU hardware
  • Which to use? (Tim Dettmers)
  • Download CUDA & CuDNN, set their environment vars
  • use nvidie-smi cmnd to check installation
  • install TF with GPU support
  • open Python shell, verify TF detects CUDA & cuDNN

    import tensorflow as tf

    sess = tf.Session()

import tensorflow as tf

config = tf.ConfigProto()
#config.gpu_options.per_process_gpu_memory_fraction=0.4
config

Managing GPU RAM

  • TF grabs all GPU RAM on first graph invocation. To run 2nd TF program while the 1st is still running, run each process on different GPU cards. (Below: program #1 sees GPUs 0,1; program #2 sees GPUs 2,3.)

$ CUDA_VISIBLE_DEVICES=0,1 python3 program_1.py

$ CUDA_VISIBLE_DEVICES=3,2 python3 program_2.py

  • Option 2: tell TF to grab a % of memory. (Below: 40% allocation.)
session = tf.Session(config=config)
config,session

Placing Ops on Devices

Parallel Execution

  • TF Whitepaper - dynamic algorithm, distributes ops across all available devices. But not available (yet) in open-source TF.

Simple Placement

  • Mostly up to you. To pin devices to specific device, use a device() function. Below: a,b pinned to cpu#0; c can go anywhere.
import tensorflow as tf

with tf.device("/cpu:0"):
    a,b = tf.Variable(3.0), tf.Variable(4.0)
c = a*b
c

Logging Placements

  • Use log_device_placement=True. This tells placer to log msg whenever a node is "placed".
import tensorflow as tf

config = tf.ConfigProto()
config.log_device_placement = True
sess = tf.Session(config=config)
print(config,"\n",sess)

Dynamic Placement

  • You can specify a function instead of a device when creating a device block.
import tensorflow as tf

def variables_on_cpu(op):
    if op.type == "Variable":
        return "/cpu:0"
    else:
        return "/cpu:0"
with tf.device(variables_on_cpu):
    a = tf.Variable(3.0)
    b = tf.constant(4.0)
    c = a * b
c

Ops & Kernels

  • TF operations need to define a kernel to run n a device. Not all ops have kernels for both GPUs and CPUs. Example: TF doesn't have integer kernel for GPUs. Changin i (below) from 3 to 3.0 should allow op to run.
import tensorflow as tf
with tf.device("/gpu:0"):
    i = tf.Variable(3)
test = sess.run(i.initializer)
test
  • To allow TF to "fall back" to a CPU instead, use allow_soft_placement=True.
with tf.device("/gpu:0"):
    i = tf.Variable(3)
config = tf.ConfigProto()
config.allow_soft_placement = True
sess = tf.Session(config=config)
test = sess.run(i.initializer) # the placer runs and falls back to /cpu:0
print(test)

Parallel Execution

  • TF executes any nodes with zero dependencies first. If those nodes are on separate devices, they are run in parallel. If on the same device, they are run in different threads & may be run in parallel.

Control Dependencies

  • Use control dependencies to control/postpone node evaluations (ex: premature memory hogging).
import tensorflow as tf
a = tf.constant(1.0)
b = a + 2.0

with tf.control_dependencies([a,b]):
    x = tf.constant(3.0)
    y = tf.constant(4.0)

print(x+y)

Multiple Devices - Multiple Servers

  • cluster: >=1 TF servers ("tasks") across machines. Tasks belong to jobs (collections of related tasks)
  • "ps" = parameter server
  • "worker" = computing engine
cluster_spec = tf.train.ClusterSpec({
"ps": [
"machine-a.example.com:2221", # /job:ps/task:0
],
"worker": [
"machine-a.example.com:2222", # /job:worker/task:0
"machine-b.example.com:2222", # /job:worker/task:1
]})

cluster_spec
server.join()
# blocks main thread until server stops (i.e., never)

Opening a Session

# NOT YET WORKING
# open session
#a = tf.constant(1.0)
#b = a + 2
#c = a * 3
#with tf.Session("grpc://machine-b.example.com:2222") as sess:
#    print(c.eval()) # 9.0

Master & Worker Services

  • gRPC protocol to talk to servers. HTTP2 basis, bidirectional
  • based on protocol buffers
  • all servers can provide master & worker services.

Pinning Ops Across Tasks

  • you can pin ops to any device
  • ex:
# NOT WORKING YET
#with tf.device("/job:ps/task:0/cpu:0")
#a = tf.constant(1.0)
#with tf.device("/job:worker/task:0/cpu:0")
#with tf.device("/job:worker/task:0/gpu:1")
#b = a + 2
#c = a + b

Sharding Variables across Multiple Param Servers

  • sharding across servers mitigates risk of network card saturation
  • TF distribs variables across all "ps" tasks - round robin setup
'''NOT WORKING YET
import tensorflow as tf
with tf.device(tf.train.replica_device_setter(ps_tasks=2):
    v1 = tf.Variable(1.0) # pinned to /job:ps/task:0
    v2 = tf.Variable(2.0) # pinned to /job:ps/task:1
    v3 = tf.Variable(3.0) # pinned to /job:ps/task:0
    v4 = tf.Variable(4.0) # pinned to /job:ps/task:1
    v5 = tf.Variable(5.0) # pinned to /job:ps/task:0
'''
'NOT WORKING YET\nimport tensorflow as tf\nwith tf.device(tf.train.replica_device_setter(ps_tasks=2):\n    v1 = tf.Variable(1.0) # pinned to /job:ps/task:0\n    v2 = tf.Variable(2.0) # pinned to /job:ps/task:1\n    v3 = tf.Variable(3.0) # pinned to /job:ps/task:0\n    v4 = tf.Variable(4.0) # pinned to /job:ps/task:1\n    v5 = tf.Variable(5.0) # pinned to /job:ps/task:0\n'

Sharing State across Sessions (Resource Containers)

  • local session: all vars managed by session itself & vanish on end.
  • distributed session: vars managed by resource containers on cluster
'''# simple_client.py
#import tensorflow as tf
#import sys
#x = tf.Variable(0.0, name="x")
#increment_x = tf.assign(x, x + 1)
#with tf.Session(sys.argv[1]) as sess:
#    if sys.argv[2:]==["init"]:
#sess.run(x.initializer)
#sess.run(increment_x)
#print(x.eval())
'''
'# simple_client.py\n#import tensorflow as tf\n#import sys\n#x = tf.Variable(0.0, name="x")\n#increment_x = tf.assign(x, x + 1)\n#with tf.Session(sys.argv[1]) as sess:\n#    if sys.argv[2:]==["init"]:\n#sess.run(x.initializer)\n#sess.run(increment_x)\n#print(x.eval())\n'
# launches client which connects to B, reuses variable x
# python3 simple_client.py grpc://machine-b.example.com:2222
#2.0

Async Communications (TF Queues)

  • Queueing data
  • DeQueueing data
  • Queues of tuples
  • Closing a queue
  • RandomShuffleQueue
  • PaddingFifoQueue

Loading Data Directly from Graph

  • Needed to avoid file server (bandwidth) saturation
  • Preloading data to variables
  • Reading data from graph with reader operations

    • CSV, binary, TFRecords
    • TextLineReader reads file lines one-by-one
    • record identifier (string): filename:linenumber
    • tf.decode_csv(val, record_defaults=[...])
'''TO LOAD A GRAPH
instance_queue = tf.RandomShuffleQueue(
    capacity=10, 
    min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], 
    shapes=[[2],[]],
    name="instance_q", 
    shared_name="shared_instance_q")

enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()
'''
'TO LOAD A GRAPH\ninstance_queue = tf.RandomShuffleQueue(\n    capacity=10, \n    min_after_dequeue=2,\n    dtypes=[tf.float32, tf.int32], \n    shapes=[[2],[]],\n    name="instance_q", \n    shared_name="shared_instance_q")\n\nenqueue_instance = instance_queue.enqueue([features, target])\nclose_instance_queue = instance_queue.close()\n'
'''TO RUN THE GRAPH
with tf.Session([...]) as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    try:
        while True:
            sess.run(enqueue_instance)
    except tf.errors.OutOfRangeError as ex:
        pass # no more records in the current file and no more files to read
    sess.run(close_instance_queue)
'''
'TO RUN THE GRAPH\nwith tf.Session([...]) as sess:\n    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})\n    sess.run(close_filename_queue)\n    try:\n        while True:\n            sess.run(enqueue_instance)\n    except tf.errors.OutOfRangeError as ex:\n        pass # no more records in the current file and no more files to read\n    sess.run(close_instance_queue)\n'

Multithreaded readers using a Coordinator & QueueRunner

Other convenience functions

  • string_input_producer()
  • tf.train.start_queue_runners()
    producer functions = create queues
  • input_producer()
  • range_input_producer()
  • slice_input_procucer()
  • shuffle_batch(list_of_tensors)

    • returns RandomShuffleQueue
    • returns QueueRunner (added to GraphKeys.QUEUE_RUNNERS)
    • dequeue_many() = returns minibatch from queue

    • batch() --?

    • batch_join() --?
    • shuffle_batch_join() --?

One NN per Device

  • near-linear speedup: training 100 nets across 50 servers x 2 gpus/server roughly equiv to 1 net on 1 gpu. (perfect for hyperparamer tuning)

  • potential option: tf serving, released 2/2016

In-Graph vs Between-Graph Replication (for Ensembles)

  • Two approaches to building ensembles:

1) one big graph, one session, any server in cluster ("in graph replication")

2) one graph/network, handle synchronization yourself ("between graph replication") using queues -- considered more flexible

#RunOptions ... timeout_in_ms()
'''NOT YET
with tf.Session([...]) as sess:
    [...]
    run_options = tf.RunOptions()
    run_options.timeout_in_ms = 1000 # 1s timeout
    try:
        pred = sess.run(dequeue_prediction, options=run_options)
    except tf.errors.DeadlineExceededError as ex:
        [...] # the dequeue operation timed out after 1s
'''
'NOT YET\nwith tf.Session([...]) as sess:\n    [...]\n    run_options = tf.RunOptions()\n    run_options.timeout_in_ms = 1000 # 1s timeout\n    try:\n        pred = sess.run(dequeue_prediction, options=run_options)\n    except tf.errors.DeadlineExceededError as ex:\n        [...] # the dequeue operation timed out after 1s\n'
#
'''NOT YET
config = tf.ConfigProto()
config.operation_timeout_in_ms = 1000
# 1s timeout for every operation
with tf.Session([...], config=config) as sess:
    [...]
    try:
        pred = sess.run(dequeue_prediction)
    except tf.errors.DeadlineExceededError as ex:
        [...] # the dequeue operation timed out after 1s
'''
'NOT YET\nconfig = tf.ConfigProto()\nconfig.operation_timeout_in_ms = 1000\n# 1s timeout for every operation\nwith tf.Session([...], config=config) as sess:\n    [...]\n    try:\n        pred = sess.run(dequeue_prediction)\n    except tf.errors.DeadlineExceededError as ex:\n        [...] # the dequeue operation timed out after 1s\n'

Model Parallelism

  • Chopping models, running chunks on different devices
  • Fully Connect Nets (FCNs): not much value in doing this
  • Vertical & Horiz slicing don't work well either

  • Nets w/ partially connected layers (CNNs): easier to distribute

  • Some RNNs use mem cells (input from own output at t+1)

Data Parallelism

  • Sync updates (aggregator waits for all gradients to be available, finds avg, applies result) - could be delayed by slow devices; params could also saturate server bandwidth
  • Async updates - more training steps/minute. issue: "stale gradients" (when computing gradients falls behind rate of parameter change) - slows convergence, introduces noise/wobble. To avoid this:
    • reduce learning rate
    • drop/scaleback stale gradients
    • adjust minibatch size
    • Start first few epochs with just one replica ("warmup phase")
  • Bandwidth - At some point, more GPUs doesn't help because network saturation won't allow more data traffic. google report. Steps you can take:
    • group gpus on single server (avoids network hops)
    • shard params acrosss servers
    • drop precision from float32 to bfloat16
    • 8b precision ("quantization"): see mobile phone apps
  • How TF does it -
    • you choose 1) replication type (in-graph, between-graph) and 2) update type (async or sync) 1) in-graph + sync: one big graph 2) in-graph + async: 1 optimizer/replica, 1 thread/replica 3) bw-graph + sync: wrap optimizer in SyncReplicasOptimizer

results matching ""

    No results matching ""