We recently released Autograd for Torch, which greatly simplified our workflow when experimenting with complex deep learning architectures. The Twitter Cortex team is continuously investing in better tooling for manipulating our large datasets, and distributing training processes across machines in our cluster.
Today we’re open-sourcing four components of our training pipeline, so the community using Torch and/or Autograd can simplify their workflows when it comes to parallelizing training, and manipulating large, distributed datasets.
This library provides common distributed learning algorithms built in Torch with the help of the the torch-ipc library. We’re starting by releasing two distributed learning algorithms:
AllReduceSGD spreads the computation of gradients across N processes. Each process is responsible for computing gradients within 1/N of the overall batch. Processes are organized in a binary tree structure, such that each reduce involves going up and down the tree. Assuming the user launches N processes, each consuming 1/N of the dataset (which is easy using torch-dataset, with Dataset(path, {partition=n, partitions=N}) ), then each process would run code that looks like this:
local allReduceSGD = require 'distlearn.AllReduceSGD'(tree) -- Ensure params are the same on all nodes: allReduceSGD.synchronizeParameters(params) for _ = 1,epochs do for _ = 1,steps -- Compute your gradients as normal local grads = computeYourGrads(...) -- Sum and normalize them (this is synchronous): allReduceSGD.sumAndNormalizeGradients(grads) -- Do your SGD as normal sgdStep(params, grads) end -- Before validating we should make sure all nodes have -- the exact same parameter values (sometimes one process sees less -- data, which results in slightly different updates): allReduceSGD.synchronizeParameters(params) -- Validate... end
AllReduceEA is an asynchronous method for distributing SGD. This method is equally easy to use, but requires additional meta-parameters:
-- Use a tau of 10 and an alpha of 0.2 (these are meta-parameters -- that typically need tuning): local allReduceEA = require 'distlearn.AllReduceEA'(tree, 10, 0.2) -- Ensure params are the same on all nodes: allReduceSGD.synchronizeParameters(params) for _ = 1,epochs do for _ = 1,steps -- Compute your gradients as normal local grads = computeYourGrads(...) -- Do your SGD as normal sgdStep(params, grads) -- Average the params: allReduceEA.averageParameters(params) end -- Make sure the center's haven't drifted too far due to -- floating point precision error build up allReduceEA.synchronizeCenter(params) -- Validate... end
This library provides many convenient ways for you to feed data into your model. It makes the acts of sampling, reading, and processing distinct so you can effectively mix and match solutions to fit your particular problem constraints. When training classifiers, having control over sampling is critical to achieve good performance. The abstractions provided in this package let us seamlessly experiment with various sampling procedures, without having to preprocess our data. Partitioning is also supported and plays nicely with sampling, such that you can easily distribute a job to process your dataset, or train a model on it.
All datasets are represented by an index, referencing raw data. We currently support four types of indexes:
The CSV index is particularly interesting to us, as it allows arbitrarily distributed datasets, and no need to pre-fetch the data to start processing it. It is particularly powerful for media datasets (images, videos, audio), where we usually store the records in an index.csv file and the actual records on a CDN for very fast retrieval during training. Index CSV files can live on the local file system or out on a SlowFS. Index files are typically small and fast to manipulate (even for very large datasets). In practice, this means that you can run a new training or processing script on an empty machine with zero overhead, as the actual data will get downloaded while you start requesting for it.
Just use a path like slowfs:///user/somebody/path/to/index.csv to load an index directly off of a SlowFS. Here is a very simple example index.csv:
filename,label1,label2
https://your.super.fast.cdn.com/somedir/2baf2949f9416ca0311.jpg,XX
https://your.super.fast.cdn.com/somedir/2baf2949f9416adcjd2.jpg,XX,XY
https://your.super.fast.cdn.com/somedir/2baf2949f9416dcdcd1.jpg,AA
https://your.super.fast.cdn.com/somedir/2baf2949f9416dcdgb3.jpg,AA,BB
And here is an example of CIFAR-10, hosted on Amazon S3. In this case, the urls have been factored, and the prefix url is provided in this meta file.
Given a csv index like this one, you would consume it like this:
Dataset = require 'dataset.Dataset' dataset, numBatches = Dataset('http:// d3jod65ytittfm.cloudfront.net/dataset/cifar10/training.csv') getBatch, numBatches = dataset.sampledBatcher({ -- request batches of 10 elements: batchSize = 10, -- input dimensions for each single element in the batch: inputDims = { 3, 32, 32 }, -- how to sample from the csv index -- (could be lots of other things, like uniform, label-uniform…): samplerKind = 'linear', -- each element fetched triggers a call to this processor, which -- is called in a separate lua env/thread (all processors run in -- parallel): processor = function(fetched, opt, decoded) -- the goal of the processor is to transform the raw asset -- into a decoded tensor of the right size. Here any user -- codec could be used (thrift, json, jpg, mp4, …): local image = require 'image' local bytes = torch.ByteTensor( torch.ByteStorage():string(fetched) ) local pixels = image.decompressPNG(bytes) decoded:copy(pixels) return true end, }) -- now we can simply request batches like this: for i = 1,numBatches do local batch = getBatch() print(batch.input, batch.target) -- batch.input is a 10x3x32x32 tensor -- while batch.target is a 10-dim tensor end
Any other type of data can be supported by simply defining another type of processor. The dataset object can also be created with a partition ID such that the index is automatically sharded, and each process only sees a subset of it.
This library provides a codec for the Thrift format. It supports very fast deserialization of arbitrary Thrift binary data to Lua native types. It also includes serialization of Lua native types back into Thrift binary based on a provided schema. We like Thrift a lot at Twitter, to represent our data records, in most of our structured datasets (think Tweet + engagement data + image + user info, etc.).
Example: assume that our dataset above is instead a dataset of Thrift records, each containing a binary image, and metadata about the image. Everything would remain the same, except for the dataset processor, which would look like this:
-- let’s assume a thrift record that’s defined like this: -- struct MediaRecord { -- 1: required string jpeg // the raw jpeg -- 2: optional string caption // a user caption -- 3: optional list<i32> resolution // original resolution -- } -- the processor would now be: processor = function(fetched, opt, decoded) -- fetched is now a thrift record local t = require 'libthrift'.codec() local record = t:read(fetched) -- the record is now a regular lua table, that respects -- the nested structure of the thrift definition local imageData = record[1] local image = require 'image' local bytes = torch.ByteTensor( torch.ByteStorage():string(imageData) ) local pixels = image.decompressPNG(bytes) local caption = record[2] or '<no caption>' local resolution = record[3] or {} -- return decoded image + free-form metadata: local metadata = { caption = caption, originalResolution = resolution, } decoded:copy(pixels) return true, metadata end
This library provides a set of primitives that extend Torch for high performance parallel computation across thread and process boundaries. These primitives are the backbone of our distributed training stack (work queues, client-server connections, grouping processes in a tree, and so on).
To get started, first install Torch, by following instructions on torch.ch. Next, go ahead and install our 5 packages with luarocks, Lua’s package manager:
luarocks install autograd luarocks install thrift luarocks install dataset luarocks install ipc luarocks install distlearn
At this point, you should be able to run all the code snippets above, and most importantly the examples we provide:
git clone [email protected]:twitter/torch-distlearn.git cd torch-distlearn/examples th mnist.lua # single process ./mnist.sh # 4 processes, distributed using all reduce SGD ./mnist-ea.sh # 4 processes, distributed using elastic averaging
We recommend you start looking at this MNIST example, or the CIFAR-10 example in the same directory, which both demonstrate how to:
We hope you enjoy these tools, and give us some feedback. We accept pull-requests and issues on GitHub. Happy sharding!
Did someone say … cookies?
X and its partners use cookies to provide you with a better, safer and
faster service and to support our business. Some cookies are necessary to use
our services, improve our services, and make sure they work properly.
Show more about your choices.