Blame | Last modification | View Log | Download
# Written by Bram Cohen# see LICENSE.txt for license informationfrom BitTornado.piecebuffer import BufferPoolfrom threading import Lockfrom time import time, strftime, localtimeimport osfrom os.path import exists, getsize, getmtime, basenamefrom traceback import print_exctry:from os import fsyncexcept ImportError:fsync = lambda x: Nonefrom bisect import bisecttry:Trueexcept:True = 1False = 0DEBUG = FalseMAXREADSIZE = 32768MAXLOCKSIZE = 1000000000LMAXLOCKRANGE = 3999999999L # only lock first 4 gig of file_pool = BufferPool()PieceBuffer = _pool.newdef dummy_status(fractionDone = None, activity = None):passclass Storage:def __init__(self, files, piece_length, doneflag, config,disabled_files = None):# can raise IOError and ValueErrorself.files = filesself.piece_length = piece_lengthself.doneflag = doneflagself.disabled = [False] * len(files)self.file_ranges = []self.disabled_ranges = []self.working_ranges = []numfiles = 0total = 0lso_far = 0lself.handles = {}self.whandles = {}self.tops = {}self.sizes = {}self.mtimes = {}if config.get('lock_files', True):self.lock_file, self.unlock_file = self._lock_file, self._unlock_fileelse:self.lock_file, self.unlock_file = lambda x1,x2: None, lambda x1,x2: Noneself.lock_while_reading = config.get('lock_while_reading', False)self.lock = Lock()if not disabled_files:disabled_files = [False] * len(files)for i in xrange(len(files)):file, length = files[i]if doneflag.isSet(): # bail out if doneflag is setreturnself.disabled_ranges.append(None)if length == 0:self.file_ranges.append(None)self.working_ranges.append([])else:range = (total, total + length, 0, file)self.file_ranges.append(range)self.working_ranges.append([range])numfiles += 1total += lengthif disabled_files[i]:l = 0else:if exists(file):l = getsize(file)if l > length:h = open(file, 'rb+')h.truncate(length)h.flush()h.close()l = lengthelse:l = 0h = open(file, 'wb+')h.flush()h.close()self.mtimes[file] = getmtime(file)self.tops[file] = lself.sizes[file] = lengthso_far += lself.total_length = totalself._reset_ranges()self.max_files_open = config['max_files_open']if self.max_files_open > 0 and numfiles > self.max_files_open:self.handlebuffer = []else:self.handlebuffer = Noneif os.name == 'nt':def _lock_file(self, name, f):import msvcrtfor p in range(0, min(self.sizes[name],MAXLOCKRANGE), MAXLOCKSIZE):f.seek(p)msvcrt.locking(f.fileno(), msvcrt.LK_LOCK,min(MAXLOCKSIZE,self.sizes[name]-p))def _unlock_file(self, name, f):import msvcrtfor p in range(0, min(self.sizes[name],MAXLOCKRANGE), MAXLOCKSIZE):f.seek(p)msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK,min(MAXLOCKSIZE,self.sizes[name]-p))elif os.name == 'posix':def _lock_file(self, name, f):import fcntlfcntl.flock(f.fileno(), fcntl.LOCK_EX)def _unlock_file(self, name, f):import fcntlfcntl.flock(f.fileno(), fcntl.LOCK_UN)else:def _lock_file(self, name, f):passdef _unlock_file(self, name, f):passdef was_preallocated(self, pos, length):for file, begin, end in self._intervals(pos, length):if self.tops.get(file, 0) < end:return Falsereturn Truedef _sync(self, file):self._close(file)if self.handlebuffer:self.handlebuffer.remove(file)def sync(self):# may raise IOError or OSErrorfor file in self.whandles.keys():self._sync(file)def set_readonly(self, f=None):if f is None:self.sync()returnfile = self.files[f][0]if self.whandles.has_key(file):self._sync(file)def get_total_length(self):return self.total_lengthdef _open(self, file, mode):if self.mtimes.has_key(file):try:if self.handlebuffer is not None:assert getsize(file) == self.tops[file]newmtime = getmtime(file)oldmtime = self.mtimes[file]assert newmtime <= oldmtime+1assert newmtime >= oldmtime-1except:if DEBUG:print ( file+' modified: '+strftime('(%x %X)',localtime(self.mtimes[file]))+strftime(' != (%x %X) ?',localtime(getmtime(file))) )raise IOError('modified during download')try:return open(file, mode)except:if DEBUG:print_exc()raisedef _close(self, file):f = self.handles[file]del self.handles[file]if self.whandles.has_key(file):del self.whandles[file]f.flush()self.unlock_file(file, f)f.close()self.tops[file] = getsize(file)self.mtimes[file] = getmtime(file)else:if self.lock_while_reading:self.unlock_file(file, f)f.close()def _close_file(self, file):if not self.handles.has_key(file):returnself._close(file)if self.handlebuffer:self.handlebuffer.remove(file)def _get_file_handle(self, file, for_write):if self.handles.has_key(file):if for_write and not self.whandles.has_key(file):self._close(file)try:f = self._open(file, 'rb+')self.handles[file] = fself.whandles[file] = 1self.lock_file(file, f)except (IOError, OSError), e:if DEBUG:print_exc()raise IOError('unable to reopen '+file+': '+str(e))if self.handlebuffer:if self.handlebuffer[-1] != file:self.handlebuffer.remove(file)self.handlebuffer.append(file)elif self.handlebuffer is not None:self.handlebuffer.append(file)else:try:if for_write:f = self._open(file, 'rb+')self.handles[file] = fself.whandles[file] = 1self.lock_file(file, f)else:f = self._open(file, 'rb')self.handles[file] = fif self.lock_while_reading:self.lock_file(file, f)except (IOError, OSError), e:if DEBUG:print_exc()raise IOError('unable to open '+file+': '+str(e))if self.handlebuffer is not None:self.handlebuffer.append(file)if len(self.handlebuffer) > self.max_files_open:self._close(self.handlebuffer.pop(0))return self.handles[file]def _reset_ranges(self):self.ranges = []for l in self.working_ranges:self.ranges.extend(l)self.begins = [i[0] for i in self.ranges]def _intervals(self, pos, amount):r = []stop = pos + amountp = bisect(self.begins, pos) - 1while p < len(self.ranges):begin, end, offset, file = self.ranges[p]if begin >= stop:breakr.append(( file,offset + max(pos, begin) - begin,offset + min(end, stop) - begin ))p += 1return rdef read(self, pos, amount, flush_first = False):r = PieceBuffer()for file, pos, end in self._intervals(pos, amount):if DEBUG:print 'reading '+file+' from '+str(pos)+' to '+str(end)self.lock.acquire()h = self._get_file_handle(file, False)if flush_first and self.whandles.has_key(file):h.flush()fsync(h)h.seek(pos)while pos < end:length = min(end-pos, MAXREADSIZE)data = h.read(length)if len(data) != length:raise IOError('error reading data from '+file)r.append(data)pos += lengthself.lock.release()return rdef write(self, pos, s):# might raise an IOErrortotal = 0for file, begin, end in self._intervals(pos, len(s)):if DEBUG:print 'writing '+file+' from '+str(pos)+' to '+str(end)self.lock.acquire()h = self._get_file_handle(file, True)h.seek(begin)h.write(s[total: total + end - begin])self.lock.release()total += end - begindef top_off(self):for begin, end, offset, file in self.ranges:l = offset + end - beginif l > self.tops.get(file, 0):self.lock.acquire()h = self._get_file_handle(file, True)h.seek(l-1)h.write(chr(0xFF))self.lock.release()def flush(self):# may raise IOError or OSErrorfor file in self.whandles.keys():self.lock.acquire()self.handles[file].flush()self.lock.release()def close(self):for file, f in self.handles.items():try:self.unlock_file(file, f)except:passtry:f.close()except:passself.handles = {}self.whandles = {}self.handlebuffer = Nonedef _get_disabled_ranges(self, f):if not self.file_ranges[f]:return ((),(),())r = self.disabled_ranges[f]if r:return rstart, end, offset, file = self.file_ranges[f]if DEBUG:print 'calculating disabled range for '+self.files[f][0]print 'bytes: '+str(start)+'-'+str(end)print 'file spans pieces '+str(int(start/self.piece_length))+'-'+str(int((end-1)/self.piece_length)+1)pieces = range( int(start/self.piece_length),int((end-1)/self.piece_length)+1 )offset = 0disabled_files = []if len(pieces) == 1:if ( start % self.piece_length == 0and end % self.piece_length == 0 ): # happens to be a single,# perfect pieceworking_range = [(start, end, offset, file)]update_pieces = []else:midfile = os.path.join(self.bufferdir,str(f))working_range = [(start, end, 0, midfile)]disabled_files.append((midfile, start, end))length = end - startself.sizes[midfile] = lengthpiece = pieces[0]update_pieces = [(piece, start-(piece*self.piece_length), length)]else:update_pieces = []if start % self.piece_length != 0: # doesn't begin on an even piece boundaryend_b = pieces[1]*self.piece_lengthstartfile = os.path.join(self.bufferdir,str(f)+'b')working_range_b = [ ( start, end_b, 0, startfile ) ]disabled_files.append((startfile, start, end_b))length = end_b - startself.sizes[startfile] = lengthoffset = lengthpiece = pieces.pop(0)update_pieces.append((piece, start-(piece*self.piece_length), length))else:working_range_b = []if f != len(self.files)-1 and end % self.piece_length != 0:# doesn't end on an even piece boundarystart_e = pieces[-1] * self.piece_lengthendfile = os.path.join(self.bufferdir,str(f)+'e')working_range_e = [ ( start_e, end, 0, endfile ) ]disabled_files.append((endfile, start_e, end))length = end - start_eself.sizes[endfile] = lengthpiece = pieces.pop(-1)update_pieces.append((piece, 0, length))else:working_range_e = []if pieces:working_range_m = [ ( pieces[0]*self.piece_length,(pieces[-1]+1)*self.piece_length,offset, file ) ]else:working_range_m = []working_range = working_range_b + working_range_m + working_range_eif DEBUG:print str(working_range)print str(update_pieces)r = (tuple(working_range), tuple(update_pieces), tuple(disabled_files))self.disabled_ranges[f] = rreturn rdef set_bufferdir(self, dir):self.bufferdir = dirdef enable_file(self, f):if not self.disabled[f]:returnself.disabled[f] = Falser = self.file_ranges[f]if not r:returnfile = r[3]if not exists(file):h = open(file, 'wb+')h.flush()h.close()if not self.tops.has_key(file):self.tops[file] = getsize(file)if not self.mtimes.has_key(file):self.mtimes[file] = getmtime(file)self.working_ranges[f] = [r]def disable_file(self, f):if self.disabled[f]:returnself.disabled[f] = Truer = self._get_disabled_ranges(f)if not r:returnfor file, begin, end in r[2]:if not os.path.isdir(self.bufferdir):os.makedirs(self.bufferdir)if not exists(file):h = open(file, 'wb+')h.flush()h.close()if not self.tops.has_key(file):self.tops[file] = getsize(file)if not self.mtimes.has_key(file):self.mtimes[file] = getmtime(file)self.working_ranges[f] = r[0]reset_file_status = _reset_rangesdef get_piece_update_list(self, f):return self._get_disabled_ranges(f)[1]def delete_file(self, f):try:os.remove(self.files[f][0])except:pass'''Pickled data format:d['files'] = [ file #, size, mtime {, file #, size, mtime...} ]file # in torrent, and the size and last modificationtime for those files. Missing files are either emptyor disabled.d['partial files'] = [ name, size, mtime... ]Names, sizes and last modification times of files containingpartial piece data. Filenames go by the following convention:{file #, 0-based}{nothing, "b" or "e"}eg: "0e" "3" "4b" "4e"Where "b" specifies the partial data for the first piece inthe file, "e" the last piece, and no letter signifying thatthe file is disabled but is smaller than one piece, and thatall the data is cached inside so adjacent files may beverified.'''def pickle(self):files = []pfiles = []for i in xrange(len(self.files)):if not self.files[i][1]: # length == 0continueif self.disabled[i]:for file, start, end in self._get_disabled_ranges(i)[2]:pfiles.extend([basename(file),getsize(file),getmtime(file)])continuefile = self.files[i][0]files.extend([i,getsize(file),getmtime(file)])return {'files': files, 'partial files': pfiles}def unpickle(self, data):# assume all previously-disabled files have already been disabledtry:files = {}pfiles = {}l = data['files']assert len(l) % 3 == 0l = [l[x:x+3] for x in xrange(0,len(l),3)]for f, size, mtime in l:files[f] = (size, mtime)l = data.get('partial files',[])assert len(l) % 3 == 0l = [l[x:x+3] for x in xrange(0,len(l),3)]for file, size, mtime in l:pfiles[file] = (size, mtime)valid_pieces = {}for i in xrange(len(self.files)):if self.disabled[i]:continuer = self.file_ranges[i]if not r:continuestart, end, offset, file =rif DEBUG:print 'adding '+filefor p in xrange( int(start/self.piece_length),int((end-1)/self.piece_length)+1 ):valid_pieces[p] = 1if DEBUG:print valid_pieces.keys()def test(old, size, mtime):oldsize, oldmtime = oldif size != oldsize:return Falseif mtime > oldmtime+1:return Falseif mtime < oldmtime-1:return Falsereturn Truefor i in xrange(len(self.files)):if self.disabled[i]:for file, start, end in self._get_disabled_ranges(i)[2]:f1 = basename(file)if ( not pfiles.has_key(f1)or not test(pfiles[f1],getsize(file),getmtime(file)) ):if DEBUG:print 'removing '+filefor p in xrange( int(start/self.piece_length),int((end-1)/self.piece_length)+1 ):if valid_pieces.has_key(p):del valid_pieces[p]continuefile, size = self.files[i]if not size:continueif ( not files.has_key(i)or not test(files[i],getsize(file),getmtime(file)) ):start, end, offset, file = self.file_ranges[i]if DEBUG:print 'removing '+filefor p in xrange( int(start/self.piece_length),int((end-1)/self.piece_length)+1 ):if valid_pieces.has_key(p):del valid_pieces[p]except:if DEBUG:print_exc()return []if DEBUG:print valid_pieces.keys()return valid_pieces.keys()