Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
3
 
4
from BitTornado.piecebuffer import BufferPool
5
from threading import Lock
6
from time import time, strftime, localtime
7
import os
8
from os.path import exists, getsize, getmtime, basename
9
from traceback import print_exc
10
try:
11
    from os import fsync
12
except ImportError:
13
    fsync = lambda x: None
14
from bisect import bisect
15
 
16
try:
17
    True
18
except:
19
    True = 1
20
    False = 0
21
 
22
DEBUG = False
23
 
24
MAXREADSIZE = 32768
25
MAXLOCKSIZE = 1000000000L
26
MAXLOCKRANGE = 3999999999L   # only lock first 4 gig of file
27
 
28
_pool = BufferPool()
29
PieceBuffer = _pool.new
30
 
31
def dummy_status(fractionDone = None, activity = None):
32
    pass
33
 
34
class Storage:
35
    def __init__(self, files, piece_length, doneflag, config,
36
                 disabled_files = None):
37
        # can raise IOError and ValueError
38
        self.files = files
39
        self.piece_length = piece_length
40
        self.doneflag = doneflag
41
        self.disabled = [False] * len(files)
42
        self.file_ranges = []
43
        self.disabled_ranges = []
44
        self.working_ranges = []
45
        numfiles = 0
46
        total = 0l
47
        so_far = 0l
48
        self.handles = {}
49
        self.whandles = {}
50
        self.tops = {}
51
        self.sizes = {}
52
        self.mtimes = {}
53
        if config.get('lock_files', True):
54
            self.lock_file, self.unlock_file = self._lock_file, self._unlock_file
55
        else:
56
            self.lock_file, self.unlock_file = lambda x1,x2: None, lambda x1,x2: None
57
        self.lock_while_reading = config.get('lock_while_reading', False)
58
        self.lock = Lock()
59
 
60
        if not disabled_files:
61
            disabled_files = [False] * len(files)
62
 
63
        for i in xrange(len(files)):
64
            file, length = files[i]
65
            if doneflag.isSet():    # bail out if doneflag is set
66
                return
67
            self.disabled_ranges.append(None)
68
            if length == 0:
69
                self.file_ranges.append(None)
70
                self.working_ranges.append([])
71
            else:
72
                range = (total, total + length, 0, file)
73
                self.file_ranges.append(range)
74
                self.working_ranges.append([range])
75
                numfiles += 1
76
                total += length
77
                if disabled_files[i]:
78
                    l = 0
79
                else:
80
                    if exists(file):
81
                        l = getsize(file)
82
                        if l > length:
83
                            h = open(file, 'rb+')
84
                            h.truncate(length)
85
                            h.flush()
86
                            h.close()
87
                            l = length
88
                    else:
89
                        l = 0
90
                        h = open(file, 'wb+')
91
                        h.flush()
92
                        h.close()
93
                    self.mtimes[file] = getmtime(file)
94
                self.tops[file] = l
95
                self.sizes[file] = length
96
                so_far += l
97
 
98
        self.total_length = total
99
        self._reset_ranges()
100
 
101
        self.max_files_open = config['max_files_open']
102
        if self.max_files_open > 0 and numfiles > self.max_files_open:
103
            self.handlebuffer = []
104
        else:
105
            self.handlebuffer = None
106
 
107
 
108
    if os.name == 'nt':
109
        def _lock_file(self, name, f):
110
            import msvcrt
111
            for p in range(0, min(self.sizes[name],MAXLOCKRANGE), MAXLOCKSIZE):
112
                f.seek(p)
113
                msvcrt.locking(f.fileno(), msvcrt.LK_LOCK,
114
                               min(MAXLOCKSIZE,self.sizes[name]-p))
115
 
116
        def _unlock_file(self, name, f):
117
            import msvcrt
118
            for p in range(0, min(self.sizes[name],MAXLOCKRANGE), MAXLOCKSIZE):
119
                f.seek(p)
120
                msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK,
121
                               min(MAXLOCKSIZE,self.sizes[name]-p))
122
 
123
    elif os.name == 'posix':
124
        def _lock_file(self, name, f):
125
            import fcntl
126
            fcntl.flock(f.fileno(), fcntl.LOCK_EX)
127
 
128
        def _unlock_file(self, name, f):
129
            import fcntl
130
            fcntl.flock(f.fileno(), fcntl.LOCK_UN)
131
 
132
    else:
133
        def _lock_file(self, name, f):
134
            pass
135
        def _unlock_file(self, name, f):
136
            pass
137
 
138
 
139
    def was_preallocated(self, pos, length):
140
        for file, begin, end in self._intervals(pos, length):
141
            if self.tops.get(file, 0) < end:
142
                return False
143
        return True
144
 
145
 
146
    def _sync(self, file):
147
        self._close(file)
148
        if self.handlebuffer:
149
            self.handlebuffer.remove(file)
150
 
151
    def sync(self):
152
        # may raise IOError or OSError
153
        for file in self.whandles.keys():
154
            self._sync(file)
155
 
156
 
157
    def set_readonly(self, f=None):
158
        if f is None:
159
            self.sync()
160
            return
161
        file = self.files[f][0]
162
        if self.whandles.has_key(file):
163
            self._sync(file)
164
 
165
 
166
    def get_total_length(self):
167
        return self.total_length
168
 
169
 
170
    def _open(self, file, mode):
171
        if self.mtimes.has_key(file):
172
            try:
173
              if self.handlebuffer is not None:
174
                assert getsize(file) == self.tops[file]
175
                newmtime = getmtime(file)
176
                oldmtime = self.mtimes[file]
177
                assert newmtime <= oldmtime+1
178
                assert newmtime >= oldmtime-1
179
            except:
180
                if DEBUG:
181
                    print ( file+' modified: '
182
                            +strftime('(%x %X)',localtime(self.mtimes[file]))
183
                            +strftime(' != (%x %X) ?',localtime(getmtime(file))) )
184
                raise IOError('modified during download')
185
        try:
186
            return open(file, mode)
187
        except:
188
            if DEBUG:
189
                print_exc()
190
            raise
191
 
192
 
193
    def _close(self, file):
194
        f = self.handles[file]
195
        del self.handles[file]
196
        if self.whandles.has_key(file):
197
            del self.whandles[file]
198
            f.flush()
199
            self.unlock_file(file, f)
200
            f.close()
201
            self.tops[file] = getsize(file)
202
            self.mtimes[file] = getmtime(file)
203
        else:
204
            if self.lock_while_reading:
205
                self.unlock_file(file, f)
206
            f.close()
207
 
208
 
209
    def _close_file(self, file):
210
        if not self.handles.has_key(file):
211
            return
212
        self._close(file)
213
        if self.handlebuffer:
214
            self.handlebuffer.remove(file)
215
 
216
 
217
    def _get_file_handle(self, file, for_write):
218
        if self.handles.has_key(file):
219
            if for_write and not self.whandles.has_key(file):
220
                self._close(file)
221
                try:
222
                    f = self._open(file, 'rb+')
223
                    self.handles[file] = f
224
                    self.whandles[file] = 1
225
                    self.lock_file(file, f)
226
                except (IOError, OSError), e:
227
                    if DEBUG:
228
                        print_exc()
229
                    raise IOError('unable to reopen '+file+': '+str(e))
230
 
231
            if self.handlebuffer:
232
                if self.handlebuffer[-1] != file:
233
                    self.handlebuffer.remove(file)
234
                    self.handlebuffer.append(file)
235
            elif self.handlebuffer is not None:
236
                self.handlebuffer.append(file)
237
        else:
238
            try:
239
                if for_write:
240
                    f = self._open(file, 'rb+')
241
                    self.handles[file] = f
242
                    self.whandles[file] = 1
243
                    self.lock_file(file, f)
244
                else:
245
                    f = self._open(file, 'rb')
246
                    self.handles[file] = f
247
                    if self.lock_while_reading:
248
                        self.lock_file(file, f)
249
            except (IOError, OSError), e:
250
                if DEBUG:
251
                    print_exc()
252
                raise IOError('unable to open '+file+': '+str(e))
253
 
254
            if self.handlebuffer is not None:
255
                self.handlebuffer.append(file)
256
                if len(self.handlebuffer) > self.max_files_open:
257
                    self._close(self.handlebuffer.pop(0))
258
 
259
        return self.handles[file]
260
 
261
 
262
    def _reset_ranges(self):
263
        self.ranges = []
264
        for l in self.working_ranges:
265
            self.ranges.extend(l)
266
            self.begins = [i[0] for i in self.ranges]
267
 
268
    def _intervals(self, pos, amount):
269
        r = []
270
        stop = pos + amount
271
        p = bisect(self.begins, pos) - 1
272
        while p < len(self.ranges):
273
            begin, end, offset, file = self.ranges[p]
274
            if begin >= stop:
275
                break
276
            r.append(( file,
277
                       offset + max(pos, begin) - begin,
278
                       offset + min(end, stop) - begin   ))
279
            p += 1
280
        return r
281
 
282
 
283
    def read(self, pos, amount, flush_first = False):
284
        r = PieceBuffer()
285
        for file, pos, end in self._intervals(pos, amount):
286
            if DEBUG:
287
                print 'reading '+file+' from '+str(pos)+' to '+str(end)
288
            self.lock.acquire()
289
            h = self._get_file_handle(file, False)
290
            if flush_first and self.whandles.has_key(file):
291
                h.flush()
292
                fsync(h)
293
            h.seek(pos)
294
            while pos < end:
295
                length = min(end-pos, MAXREADSIZE)
296
                data = h.read(length)
297
                if len(data) != length:
298
                    raise IOError('error reading data from '+file)
299
                r.append(data)
300
                pos += length
301
            self.lock.release()
302
        return r
303
 
304
    def write(self, pos, s):
305
        # might raise an IOError
306
        total = 0
307
        for file, begin, end in self._intervals(pos, len(s)):
308
            if DEBUG:
309
                print 'writing '+file+' from '+str(pos)+' to '+str(end)
310
            self.lock.acquire()
311
            h = self._get_file_handle(file, True)
312
            h.seek(begin)
313
            h.write(s[total: total + end - begin])
314
            self.lock.release()
315
            total += end - begin
316
 
317
    def top_off(self):
318
        for begin, end, offset, file in self.ranges:
319
            l = offset + end - begin
320
            if l > self.tops.get(file, 0):
321
                self.lock.acquire()
322
                h = self._get_file_handle(file, True)
323
                h.seek(l-1)
324
                h.write(chr(0xFF))
325
                self.lock.release()
326
 
327
    def flush(self):
328
        # may raise IOError or OSError
329
        for file in self.whandles.keys():
330
            self.lock.acquire()
331
            self.handles[file].flush()
332
            self.lock.release()
333
 
334
    def close(self):
335
        for file, f in self.handles.items():
336
            try:
337
                self.unlock_file(file, f)
338
            except:
339
                pass
340
            try:
341
                f.close()
342
            except:
343
                pass
344
        self.handles = {}
345
        self.whandles = {}
346
        self.handlebuffer = None
347
 
348
 
349
    def _get_disabled_ranges(self, f):
350
        if not self.file_ranges[f]:
351
            return ((),(),())
352
        r = self.disabled_ranges[f]
353
        if r:
354
            return r
355
        start, end, offset, file = self.file_ranges[f]
356
        if DEBUG:
357
            print 'calculating disabled range for '+self.files[f][0]
358
            print 'bytes: '+str(start)+'-'+str(end)
359
            print 'file spans pieces '+str(int(start/self.piece_length))+'-'+str(int((end-1)/self.piece_length)+1)
360
        pieces = range( int(start/self.piece_length),
361
                        int((end-1)/self.piece_length)+1 )
362
        offset = 0
363
        disabled_files = []
364
        if len(pieces) == 1:
365
            if ( start % self.piece_length == 0
366
                 and end % self.piece_length == 0 ):   # happens to be a single,
367
                                                       # perfect piece
368
                working_range = [(start, end, offset, file)]
369
                update_pieces = []
370
            else:
371
                midfile = os.path.join(self.bufferdir,str(f))
372
                working_range = [(start, end, 0, midfile)]
373
                disabled_files.append((midfile, start, end))
374
                length = end - start
375
                self.sizes[midfile] = length
376
                piece = pieces[0]
377
                update_pieces = [(piece, start-(piece*self.piece_length), length)]
378
        else:
379
            update_pieces = []
380
            if start % self.piece_length != 0:  # doesn't begin on an even piece boundary
381
                end_b = pieces[1]*self.piece_length
382
                startfile = os.path.join(self.bufferdir,str(f)+'b')
383
                working_range_b = [ ( start, end_b, 0, startfile ) ]
384
                disabled_files.append((startfile, start, end_b))
385
                length = end_b - start
386
                self.sizes[startfile] = length
387
                offset = length
388
                piece = pieces.pop(0)
389
                update_pieces.append((piece, start-(piece*self.piece_length), length))
390
            else:
391
                working_range_b = []
392
            if f  != len(self.files)-1 and end % self.piece_length != 0:
393
                                                # doesn't end on an even piece boundary
394
                start_e = pieces[-1] * self.piece_length
395
                endfile = os.path.join(self.bufferdir,str(f)+'e')
396
                working_range_e = [ ( start_e, end, 0, endfile ) ]
397
                disabled_files.append((endfile, start_e, end))
398
                length = end - start_e
399
                self.sizes[endfile] = length
400
                piece = pieces.pop(-1)
401
                update_pieces.append((piece, 0, length))
402
            else:
403
                working_range_e = []
404
            if pieces:
405
                working_range_m = [ ( pieces[0]*self.piece_length,
406
                                      (pieces[-1]+1)*self.piece_length,
407
                                      offset, file ) ]
408
            else:
409
                working_range_m = []
410
            working_range = working_range_b + working_range_m + working_range_e
411
 
412
        if DEBUG:            
413
            print str(working_range)
414
            print str(update_pieces)
415
        r = (tuple(working_range), tuple(update_pieces), tuple(disabled_files))
416
        self.disabled_ranges[f] = r
417
        return r
418
 
419
 
420
    def set_bufferdir(self, dir):
421
        self.bufferdir = dir
422
 
423
    def enable_file(self, f):
424
        if not self.disabled[f]:
425
            return
426
        self.disabled[f] = False
427
        r = self.file_ranges[f]
428
        if not r:
429
            return
430
        file = r[3]
431
        if not exists(file):
432
            h = open(file, 'wb+')
433
            h.flush()
434
            h.close()
435
        if not self.tops.has_key(file):
436
            self.tops[file] = getsize(file)
437
        if not self.mtimes.has_key(file):
438
            self.mtimes[file] = getmtime(file)
439
        self.working_ranges[f] = [r]
440
 
441
    def disable_file(self, f):
442
        if self.disabled[f]:
443
            return
444
        self.disabled[f] = True
445
        r = self._get_disabled_ranges(f)
446
        if not r:
447
            return
448
        for file, begin, end in r[2]:
449
            if not os.path.isdir(self.bufferdir):
450
                os.makedirs(self.bufferdir)
451
            if not exists(file):
452
                h = open(file, 'wb+')
453
                h.flush()
454
                h.close()
455
            if not self.tops.has_key(file):
456
                self.tops[file] = getsize(file)
457
            if not self.mtimes.has_key(file):
458
                self.mtimes[file] = getmtime(file)
459
        self.working_ranges[f] = r[0]
460
 
461
    reset_file_status = _reset_ranges
462
 
463
 
464
    def get_piece_update_list(self, f):
465
        return self._get_disabled_ranges(f)[1]
466
 
467
 
468
    def delete_file(self, f):
469
        try:
470
            os.remove(self.files[f][0])
471
        except:
472
            pass
473
 
474
 
475
    '''
476
    Pickled data format:
477
 
478
    d['files'] = [ file #, size, mtime {, file #, size, mtime...} ]
479
                    file # in torrent, and the size and last modification
480
                    time for those files.  Missing files are either empty
481
                    or disabled.
482
    d['partial files'] = [ name, size, mtime... ]
483
                    Names, sizes and last modification times of files containing
484
                    partial piece data.  Filenames go by the following convention:
485
                    {file #, 0-based}{nothing, "b" or "e"}
486
                    eg: "0e" "3" "4b" "4e"
487
                    Where "b" specifies the partial data for the first piece in
488
                    the file, "e" the last piece, and no letter signifying that
489
                    the file is disabled but is smaller than one piece, and that
490
                    all the data is cached inside so adjacent files may be
491
                    verified.
492
    '''
493
    def pickle(self):
494
        files = []
495
        pfiles = []
496
        for i in xrange(len(self.files)):
497
            if not self.files[i][1]:    # length == 0
498
                continue
499
            if self.disabled[i]:
500
                for file, start, end in self._get_disabled_ranges(i)[2]:
501
                    pfiles.extend([basename(file),getsize(file),getmtime(file)])
502
                continue
503
            file = self.files[i][0]
504
            files.extend([i,getsize(file),getmtime(file)])
505
        return {'files': files, 'partial files': pfiles}
506
 
507
 
508
    def unpickle(self, data):
509
        # assume all previously-disabled files have already been disabled
510
        try:
511
            files = {}
512
            pfiles = {}
513
            l = data['files']
514
            assert len(l) % 3 == 0
515
            l = [l[x:x+3] for x in xrange(0,len(l),3)]
516
            for f, size, mtime in l:
517
                files[f] = (size, mtime)
518
            l = data.get('partial files',[])
519
            assert len(l) % 3 == 0
520
            l = [l[x:x+3] for x in xrange(0,len(l),3)]
521
            for file, size, mtime in l:
522
                pfiles[file] = (size, mtime)
523
 
524
            valid_pieces = {}
525
            for i in xrange(len(self.files)):
526
                if self.disabled[i]:
527
                    continue
528
                r = self.file_ranges[i]
529
                if not r:
530
                    continue
531
                start, end, offset, file =r
532
                if DEBUG:
533
                    print 'adding '+file
534
                for p in xrange( int(start/self.piece_length),
535
                                 int((end-1)/self.piece_length)+1 ):
536
                    valid_pieces[p] = 1
537
 
538
            if DEBUG:
539
                print valid_pieces.keys()
540
 
541
            def test(old, size, mtime):
542
                oldsize, oldmtime = old
543
                if size != oldsize:
544
                    return False
545
                if mtime > oldmtime+1:
546
                    return False
547
                if mtime < oldmtime-1:
548
                    return False
549
                return True
550
 
551
            for i in xrange(len(self.files)):
552
                if self.disabled[i]:
553
                    for file, start, end in self._get_disabled_ranges(i)[2]:
554
                        f1 = basename(file)
555
                        if ( not pfiles.has_key(f1)
556
                             or not test(pfiles[f1],getsize(file),getmtime(file)) ):
557
                            if DEBUG:
558
                                print 'removing '+file
559
                            for p in xrange( int(start/self.piece_length),
560
                                             int((end-1)/self.piece_length)+1 ):
561
                                if valid_pieces.has_key(p):
562
                                    del valid_pieces[p]
563
                    continue
564
                file, size = self.files[i]
565
                if not size:
566
                    continue
567
                if ( not files.has_key(i)
568
                     or not test(files[i],getsize(file),getmtime(file)) ):
569
                    start, end, offset, file = self.file_ranges[i]
570
                    if DEBUG:
571
                        print 'removing '+file
572
                    for p in xrange( int(start/self.piece_length),
573
                                     int((end-1)/self.piece_length)+1 ):
574
                        if valid_pieces.has_key(p):
575
                            del valid_pieces[p]
576
        except:
577
            if DEBUG:
578
                print_exc()
579
            return []
580
 
581
        if DEBUG:
582
            print valid_pieces.keys()                        
583
        return valid_pieces.keys()
584