https://gist.github.com/2914601
import multiprocessing, os, sys, time
from itertools import izip
import redis
rr = None
def process_chunk(chunk):
global rr
if rr is None:
rr = redis.Redis() # Create one connection per process
print 'PID %d' % os.getpid()
pl = rr.pipeline(transaction=False)
lc = 0
for line in chunk:
sample = dict(izip(cols, line.split(',')))
key = "%s" % (sample['hhmi']])
bps = int(sample['data'])
for f in fields:
pl.hincrby("%s:%s" % (key, f), sample[f], bps)
lc = lc + 1
pl.execute()
return lc
def grouper(n, iterable):
return izip(*[iter(iterable)]*n)
if __name__ == '__main__':
cols = #list of all columns in the data set / header
cols = cols.split(',')
fields = #list of fields to parse
fields = fields.split(',')
csv = open('0000.csv','rU')
pool = multiprocessing.Pool(6)
c = 0
lc = 0
jobs = []
for chunk in grouper(100, csv.readlines()):
c += 1
jobs.append(chunk)
print '%d chunks' % c
stime = time.time()
rc = pool.map(process_chunk, jobs)
rtime = (time.time()-stime)+.001 # Avoid div/0
for l in rc: lc += l
print("END: %d/s" % int(lc/rtime))