Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
# Written by Bram Cohen
2
# see LICENSE.txt for license information
3
 
4
from BitTornado.bitfield import Bitfield
5
from sha import sha
6
from BitTornado.clock import clock
7
from traceback import print_exc
8
from random import randrange
9
try:
10
    True
11
except:
12
    True = 1
13
    False = 0
14
try:
15
    from bisect import insort
16
except:
17
    def insort(l, item):
18
        l.append(item)
19
        l.sort()
20
 
21
DEBUG = False
22
 
23
STATS_INTERVAL = 0.2
24
 
25
def dummy_status(fractionDone = None, activity = None):
26
    pass
27
 
28
class Olist:
29
    def __init__(self, l = []):
30
        self.d = {}
31
        for i in l:
32
            self.d[i] = 1
33
    def __len__(self):
34
        return len(self.d)
35
    def includes(self, i):
36
        return self.d.has_key(i)
37
    def add(self, i):
38
        self.d[i] = 1
39
    def extend(self, l):
40
        for i in l:
41
            self.d[i] = 1
42
    def pop(self, n=0):
43
        # assert self.d
44
        k = self.d.keys()
45
        if n == 0:
46
            i = min(k)
47
        elif n == -1:
48
            i = max(k)
49
        else:
50
            k.sort()
51
            i = k[n]
52
        del self.d[i]
53
        return i
54
    def remove(self, i):
55
        if self.d.has_key(i):
56
            del self.d[i]
57
 
58
class fakeflag:
59
    def __init__(self, state=False):
60
        self.state = state
61
    def wait(self):
62
        pass
63
    def isSet(self):
64
        return self.state
65
 
66
 
67
class StorageWrapper:
68
    def __init__(self, storage, request_size, hashes, 
69
            piece_size, finished, failed, 
70
            statusfunc = dummy_status, flag = fakeflag(), check_hashes = True,
71
            data_flunked = lambda x: None, backfunc = None,
72
            config = {}, unpauseflag = fakeflag(True) ):
73
        self.storage = storage
74
        self.request_size = long(request_size)
75
        self.hashes = hashes
76
        self.piece_size = long(piece_size)
77
        self.piece_length = long(piece_size)
78
        self.finished = finished
79
        self.failed = failed
80
        self.statusfunc = statusfunc
81
        self.flag = flag
82
        self.check_hashes = check_hashes
83
        self.data_flunked = data_flunked
84
        self.backfunc = backfunc
85
        self.config = config
86
        self.unpauseflag = unpauseflag
87
 
88
        self.alloc_type = config.get('alloc_type','normal')
89
        self.double_check = config.get('double_check', 0)
90
        self.triple_check = config.get('triple_check', 0)
91
        if self.triple_check:
92
            self.double_check = True
93
        self.bgalloc_enabled = False
94
        self.bgalloc_active = False
95
        self.total_length = storage.get_total_length()
96
        self.amount_left = self.total_length
97
        if self.total_length <= self.piece_size * (len(hashes) - 1):
98
            raise ValueError, 'bad data in responsefile - total too small'
99
        if self.total_length > self.piece_size * len(hashes):
100
            raise ValueError, 'bad data in responsefile - total too big'
101
        self.numactive = [0] * len(hashes)
102
        self.inactive_requests = [1] * len(hashes)
103
        self.amount_inactive = self.total_length
104
        self.amount_obtained = 0
105
        self.amount_desired = self.total_length
106
        self.have = Bitfield(len(hashes))
107
        self.have_cloaked_data = None
108
        self.blocked = [False] * len(hashes)
109
        self.blocked_holes = []
110
        self.blocked_movein = Olist()
111
        self.blocked_moveout = Olist()
112
        self.waschecked = [False] * len(hashes)
113
        self.places = {}
114
        self.holes = []
115
        self.stat_active = {}
116
        self.stat_new = {}
117
        self.dirty = {}
118
        self.stat_numflunked = 0
119
        self.stat_numdownloaded = 0
120
        self.stat_numfound = 0
121
        self.download_history = {}
122
        self.failed_pieces = {}
123
        self.out_of_place = 0
124
        self.write_buf_max = config['write_buffer_size']*1048576L
125
        self.write_buf_size = 0L
126
        self.write_buf = {}   # structure:  piece: [(start, data), ...]
127
        self.write_buf_list = []
128
 
129
        self.initialize_tasks = [
130
            ['checking existing data', 0, self.init_hashcheck, self.hashcheckfunc],
131
            ['moving data', 1, self.init_movedata, self.movedatafunc],
132
            ['allocating disk space', 1, self.init_alloc, self.allocfunc] ]
133
 
134
        self.backfunc(self._bgalloc,0.1)
135
        self.backfunc(self._bgsync,max(self.config['auto_flush']*60,60))
136
 
137
    def _bgsync(self):
138
        if self.config['auto_flush']:
139
            self.sync()
140
        self.backfunc(self._bgsync,max(self.config['auto_flush']*60,60))
141
 
142
 
143
    def old_style_init(self):
144
        while self.initialize_tasks:
145
            msg, done, init, next = self.initialize_tasks.pop(0)
146
            if init():
147
                self.statusfunc(activity = msg, fractionDone = done)
148
                t = clock() + STATS_INTERVAL
149
                x = 0
150
                while x is not None:
151
                    if t < clock():
152
                        t = clock() + STATS_INTERVAL
153
                        self.statusfunc(fractionDone = x)
154
                    self.unpauseflag.wait()
155
                    if self.flag.isSet():
156
                        return False
157
                    x = next()
158
 
159
        self.statusfunc(fractionDone = 0)
160
        return True
161
 
162
 
163
    def initialize(self, donefunc, statusfunc = None):
164
        self.initialize_done = donefunc
165
        if statusfunc is None:
166
            statusfunc = self.statusfunc
167
        self.initialize_status = statusfunc
168
        self.initialize_next = None
169
 
170
        self.backfunc(self._initialize)
171
 
172
    def _initialize(self):
173
        if not self.unpauseflag.isSet():
174
            self.backfunc(self._initialize, 1)
175
            return
176
 
177
        if self.initialize_next:
178
            x = self.initialize_next()
179
            if x is None:
180
                self.initialize_next = None
181
            else:
182
                self.initialize_status(fractionDone = x)
183
        else:
184
            if not self.initialize_tasks:
185
                self.initialize_done()
186
                return
187
            msg, done, init, next = self.initialize_tasks.pop(0)
188
            if init():
189
                self.initialize_status(activity = msg, fractionDone = done)
190
                self.initialize_next = next
191
 
192
        self.backfunc(self._initialize)
193
 
194
 
195
    def init_hashcheck(self):
196
        if self.flag.isSet():
197
            return False
198
        self.check_list = []
199
        if len(self.hashes) == 0 or self.amount_left == 0:
200
            self.check_total = 0
201
            self.finished()
202
            return False
203
 
204
        self.check_targets = {}
205
        got = {}
206
        for p,v in self.places.items():
207
            assert not got.has_key(v)
208
            got[v] = 1
209
        for i in xrange(len(self.hashes)):
210
            if self.places.has_key(i):  # restored from pickled
211
                self.check_targets[self.hashes[i]] = []
212
                if self.places[i] == i:
213
                    continue
214
                else:
215
                    assert not got.has_key(i)
216
                    self.out_of_place += 1
217
            if got.has_key(i):
218
                continue
219
            if self._waspre(i):
220
                if self.blocked[i]:
221
                    self.places[i] = i
222
                else:
223
                    self.check_list.append(i)
224
                continue
225
            if not self.check_hashes:
226
                self.failed('told file complete on start-up, but data is missing')
227
                return False
228
            self.holes.append(i)
229
            if self.blocked[i] or self.check_targets.has_key(self.hashes[i]):
230
                self.check_targets[self.hashes[i]] = [] # in case of a hash collision, discard
231
            else:
232
                self.check_targets[self.hashes[i]] = [i]
233
        self.check_total = len(self.check_list)
234
        self.check_numchecked = 0.0
235
        self.lastlen = self._piecelen(len(self.hashes) - 1)
236
        self.numchecked = 0.0
237
        return self.check_total > 0
238
 
239
    def _markgot(self, piece, pos):
240
        if DEBUG:
241
            print str(piece)+' at '+str(pos)
242
        self.places[piece] = pos
243
        self.have[piece] = True
244
        len = self._piecelen(piece)
245
        self.amount_obtained += len
246
        self.amount_left -= len
247
        self.amount_inactive -= len
248
        self.inactive_requests[piece] = None
249
        self.waschecked[piece] = self.check_hashes
250
        self.stat_numfound += 1
251
 
252
    def hashcheckfunc(self):
253
        if self.flag.isSet():
254
            return None
255
        if not self.check_list:
256
            return None
257
 
258
        i = self.check_list.pop(0)
259
        if not self.check_hashes:
260
            self._markgot(i, i)
261
        else:
262
            d1 = self.read_raw(i,0,self.lastlen)
263
            if d1 is None:
264
                return None
265
            sh = sha(d1[:])
266
            d1.release()
267
            sp = sh.digest()
268
            d2 = self.read_raw(i,self.lastlen,self._piecelen(i)-self.lastlen)
269
            if d2 is None:
270
                return None
271
            sh.update(d2[:])
272
            d2.release()
273
            s = sh.digest()
274
            if s == self.hashes[i]:
275
                self._markgot(i, i)
276
            elif ( self.check_targets.get(s)
277
                   and self._piecelen(i) == self._piecelen(self.check_targets[s][-1]) ):
278
                self._markgot(self.check_targets[s].pop(), i)
279
                self.out_of_place += 1
280
            elif ( not self.have[-1] and sp == self.hashes[-1]
281
                   and (i == len(self.hashes) - 1
282
                        or not self._waspre(len(self.hashes) - 1)) ):
283
                self._markgot(len(self.hashes) - 1, i)
284
                self.out_of_place += 1
285
            else:
286
                self.places[i] = i
287
        self.numchecked += 1
288
        if self.amount_left == 0:
289
            self.finished()
290
        return (self.numchecked / self.check_total)
291
 
292
 
293
    def init_movedata(self):
294
        if self.flag.isSet():
295
            return False
296
        if self.alloc_type != 'sparse':
297
            return False
298
        self.storage.top_off()  # sets file lengths to their final size
299
        self.movelist = []
300
        if self.out_of_place == 0:
301
            for i in self.holes:
302
                self.places[i] = i
303
            self.holes = []
304
            return False
305
        self.tomove = float(self.out_of_place)
306
        for i in xrange(len(self.hashes)):
307
            if not self.places.has_key(i):
308
                self.places[i] = i
309
            elif self.places[i] != i:
310
                self.movelist.append(i)
311
        self.holes = []
312
        return True
313
 
314
    def movedatafunc(self):
315
        if self.flag.isSet():
316
            return None
317
        if not self.movelist:
318
            return None
319
        i = self.movelist.pop(0)
320
        old = self.read_raw(self.places[i], 0, self._piecelen(i))
321
        if old is None:
322
            return None
323
        if not self.write_raw(i, 0, old):
324
            return None
325
        if self.double_check and self.have[i]:
326
            if self.triple_check:
327
                old.release()
328
                old = self.read_raw( i, 0, self._piecelen(i),
329
                                            flush_first = True )
330
                if old is None:
331
                    return None
332
            if sha(old[:]).digest() != self.hashes[i]:
333
                self.failed('download corrupted; please restart and resume')
334
                return None
335
        old.release()
336
 
337
        self.places[i] = i
338
        self.tomove -= 1
339
        return (self.tomove / self.out_of_place)
340
 
341
 
342
    def init_alloc(self):
343
        if self.flag.isSet():
344
            return False
345
        if not self.holes:
346
            return False
347
        self.numholes = float(len(self.holes))
348
        self.alloc_buf = chr(0xFF) * self.piece_size
349
        if self.alloc_type == 'pre-allocate':
350
            self.bgalloc_enabled = True
351
            return True
352
        if self.alloc_type == 'background':
353
            self.bgalloc_enabled = True
354
        if self.blocked_moveout:
355
            return True
356
        return False
357
 
358
 
359
    def _allocfunc(self):
360
        while self.holes:
361
            n = self.holes.pop(0)
362
            if self.blocked[n]: # assume not self.blocked[index]
363
                if not self.blocked_movein:
364
                    self.blocked_holes.append(n)
365
                    continue
366
                if not self.places.has_key(n):
367
                    b = self.blocked_movein.pop(0)
368
                    oldpos = self._move_piece(b, n)
369
                    self.places[oldpos] = oldpos
370
                    return None
371
            if self.places.has_key(n):
372
                oldpos = self._move_piece(n, n)
373
                self.places[oldpos] = oldpos
374
                return None
375
            return n
376
        return None
377
 
378
    def allocfunc(self):
379
        if self.flag.isSet():
380
            return None
381
 
382
        if self.blocked_moveout:
383
            self.bgalloc_active = True
384
            n = self._allocfunc()
385
            if n is not None:
386
                if self.blocked_moveout.includes(n):
387
                    self.blocked_moveout.remove(n)
388
                    b = n
389
                else:
390
                    b = self.blocked_moveout.pop(0)
391
                oldpos = self._move_piece(b,n)
392
                self.places[oldpos] = oldpos
393
            return len(self.holes) / self.numholes
394
 
395
        if self.holes and self.bgalloc_enabled:
396
            self.bgalloc_active = True
397
            n = self._allocfunc()
398
            if n is not None:
399
                self.write_raw(n, 0, self.alloc_buf[:self._piecelen(n)])
400
                self.places[n] = n
401
            return len(self.holes) / self.numholes
402
 
403
        self.bgalloc_active = False
404
        return None
405
 
406
    def bgalloc(self):
407
        if self.bgalloc_enabled:
408
            if not self.holes and not self.blocked_moveout and self.backfunc:
409
                self.backfunc(self.storage.flush)
410
                # force a flush whenever the "finish allocation" button is hit
411
        self.bgalloc_enabled = True
412
        return False
413
 
414
    def _bgalloc(self):
415
        self.allocfunc()
416
        if self.config.get('alloc_rate',0) < 0.1:
417
            self.config['alloc_rate'] = 0.1
418
        self.backfunc( self._bgalloc,
419
              float(self.piece_size)/(self.config['alloc_rate']*1048576) )
420
 
421
 
422
    def _waspre(self, piece):
423
        return self.storage.was_preallocated(piece * self.piece_size, self._piecelen(piece))
424
 
425
    def _piecelen(self, piece):
426
        if piece < len(self.hashes) - 1:
427
            return self.piece_size
428
        else:
429
            return self.total_length - (piece * self.piece_size)
430
 
431
    def get_amount_left(self):
432
        return self.amount_left
433
 
434
    def do_I_have_anything(self):
435
        return self.amount_left < self.total_length
436
 
437
    def _make_inactive(self, index):
438
        length = self._piecelen(index)
439
        l = []
440
        x = 0
441
        while x + self.request_size < length:
442
            l.append((x, self.request_size))
443
            x += self.request_size
444
        l.append((x, length - x))
445
        self.inactive_requests[index] = l
446
 
447
    def is_endgame(self):
448
        return not self.amount_inactive
449
 
450
    def am_I_complete(self):
451
        return self.amount_obtained == self.amount_desired
452
 
453
    def reset_endgame(self, requestlist):
454
        for index, begin, length in requestlist:
455
            self.request_lost(index, begin, length)
456
 
457
    def get_have_list(self):
458
        return self.have.tostring()
459
 
460
    def get_have_list_cloaked(self):
461
        if self.have_cloaked_data is None:
462
            newhave = Bitfield(copyfrom = self.have)
463
            unhaves = []
464
            n = min(randrange(2,5),len(self.hashes))    # between 2-4 unless torrent is small
465
            while len(unhaves) < n:
466
                unhave = randrange(min(32,len(self.hashes)))    # all in first 4 bytes
467
                if not unhave in unhaves:
468
                    unhaves.append(unhave)
469
                    newhave[unhave] = False
470
            self.have_cloaked_data = (newhave.tostring(), unhaves)
471
        return self.have_cloaked_data
472
 
473
    def do_I_have(self, index):
474
        return self.have[index]
475
 
476
    def do_I_have_requests(self, index):
477
        return not not self.inactive_requests[index]
478
 
479
    def is_unstarted(self, index):
480
        return ( not self.have[index] and not self.numactive[index]
481
                 and not self.dirty.has_key(index) )
482
 
483
    def get_hash(self, index):
484
        return self.hashes[index]
485
 
486
    def get_stats(self):
487
        return self.amount_obtained, self.amount_desired
488
 
489
    def new_request(self, index):
490
        # returns (begin, length)
491
        if self.inactive_requests[index] == 1:
492
            self._make_inactive(index)
493
        self.numactive[index] += 1
494
        self.stat_active[index] = 1
495
        if not self.dirty.has_key(index):
496
            self.stat_new[index] = 1
497
        rs = self.inactive_requests[index]
498
#        r = min(rs)
499
#        rs.remove(r)
500
        r = rs.pop(0)
501
        self.amount_inactive -= r[1]
502
        return r
503
 
504
 
505
    def write_raw(self, index, begin, data):
506
        try:
507
            self.storage.write(self.piece_size * index + begin, data)
508
            return True
509
        except IOError, e:
510
            self.failed('IO Error: ' + str(e))
511
            return False
512
 
513
 
514
    def _write_to_buffer(self, piece, start, data):
515
        if not self.write_buf_max:
516
            return self.write_raw(self.places[piece], start, data)
517
        self.write_buf_size += len(data)
518
        while self.write_buf_size > self.write_buf_max:
519
            old = self.write_buf_list.pop(0)
520
            if not self._flush_buffer(old, True):
521
                return False
522
        if self.write_buf.has_key(piece):
523
            self.write_buf_list.remove(piece)
524
        else:
525
            self.write_buf[piece] = []
526
        self.write_buf_list.append(piece)
527
        self.write_buf[piece].append((start,data))
528
        return True
529
 
530
    def _flush_buffer(self, piece, popped = False):
531
        if not self.write_buf.has_key(piece):
532
            return True
533
        if not popped:
534
            self.write_buf_list.remove(piece)
535
        l = self.write_buf[piece]
536
        del self.write_buf[piece]
537
        l.sort()
538
        for start, data in l:
539
            self.write_buf_size -= len(data)
540
            if not self.write_raw(self.places[piece], start, data):
541
                return False
542
        return True
543
 
544
    def sync(self):
545
        spots = {}
546
        for p in self.write_buf_list:
547
            spots[self.places[p]] = p
548
        l = spots.keys()
549
        l.sort()
550
        for i in l:
551
            try:
552
                self._flush_buffer(spots[i])
553
            except:
554
                pass
555
        try:
556
            self.storage.sync()
557
        except IOError, e:
558
            self.failed('IO Error: ' + str(e))
559
        except OSError, e:
560
            self.failed('OS Error: ' + str(e))
561
 
562
 
563
    def _move_piece(self, index, newpos):
564
        oldpos = self.places[index]
565
        if DEBUG:
566
            print 'moving '+str(index)+' from '+str(oldpos)+' to '+str(newpos)
567
        assert oldpos != index
568
        assert oldpos != newpos
569
        assert index == newpos or not self.places.has_key(newpos)
570
        old = self.read_raw(oldpos, 0, self._piecelen(index))
571
        if old is None:
572
            return -1
573
        if not self.write_raw(newpos, 0, old):
574
            return -1
575
        self.places[index] = newpos
576
        if self.have[index] and (
577
                self.triple_check or (self.double_check and index == newpos) ):
578
            if self.triple_check:
579
                old.release()
580
                old = self.read_raw(newpos, 0, self._piecelen(index),
581
                                    flush_first = True)
582
                if old is None:
583
                    return -1
584
            if sha(old[:]).digest() != self.hashes[index]:
585
                self.failed('download corrupted; please restart and resume')
586
                return -1
587
        old.release()
588
 
589
        if self.blocked[index]:
590
            self.blocked_moveout.remove(index)
591
            if self.blocked[newpos]:
592
                self.blocked_movein.remove(index)
593
            else:
594
                self.blocked_movein.add(index)
595
        else:
596
            self.blocked_movein.remove(index)
597
            if self.blocked[newpos]:
598
                self.blocked_moveout.add(index)
599
            else:
600
                self.blocked_moveout.remove(index)
601
 
602
        return oldpos
603
 
604
    def _clear_space(self, index):
605
        h = self.holes.pop(0)
606
        n = h
607
        if self.blocked[n]: # assume not self.blocked[index]
608
            if not self.blocked_movein:
609
                self.blocked_holes.append(n)
610
                return True    # repeat
611
            if not self.places.has_key(n):
612
                b = self.blocked_movein.pop(0)
613
                oldpos = self._move_piece(b, n)
614
                if oldpos < 0:
615
                    return False
616
                n = oldpos
617
        if self.places.has_key(n):
618
            oldpos = self._move_piece(n, n)
619
            if oldpos < 0:
620
                return False
621
            n = oldpos
622
        if index == n or index in self.holes:
623
            if n == h:
624
                self.write_raw(n, 0, self.alloc_buf[:self._piecelen(n)])
625
            self.places[index] = n
626
            if self.blocked[n]:
627
                # because n may be a spot cleared 10 lines above, it's possible
628
                # for it to be blocked.  While that spot could be left cleared
629
                # and a new spot allocated, this condition might occur several
630
                # times in a row, resulting in a significant amount of disk I/O,
631
                # delaying the operation of the engine.  Rather than do this,
632
                # queue the piece to be moved out again, which will be performed
633
                # by the background allocator, with which data movement is
634
                # automatically limited.
635
                self.blocked_moveout.add(index)
636
            return False
637
        for p, v in self.places.items():
638
            if v == index:
639
                break
640
        else:
641
            self.failed('download corrupted; please restart and resume')
642
            return False
643
        self._move_piece(p, n)
644
        self.places[index] = index
645
        return False
646
 
647
 
648
    def piece_came_in(self, index, begin, piece, source = None):
649
        assert not self.have[index]
650
 
651
        if not self.places.has_key(index):
652
            while self._clear_space(index):
653
                pass
654
            if DEBUG:
655
                print 'new place for '+str(index)+' at '+str(self.places[index])
656
        if self.flag.isSet():
657
            return
658
 
659
        if self.failed_pieces.has_key(index):
660
            old = self.read_raw(self.places[index], begin, len(piece))
661
            if old is None:
662
                return True
663
            if old[:].tostring() != piece:
664
                try:
665
                    self.failed_pieces[index][self.download_history[index][begin]] = 1
666
                except:
667
                    self.failed_pieces[index][None] = 1
668
            old.release()
669
        self.download_history.setdefault(index,{})[begin] = source
670
 
671
        if not self._write_to_buffer(index, begin, piece):
672
            return True
673
 
674
        self.amount_obtained += len(piece)
675
        self.dirty.setdefault(index,[]).append((begin, len(piece)))
676
        self.numactive[index] -= 1
677
        assert self.numactive[index] >= 0
678
        if not self.numactive[index]:
679
            del self.stat_active[index]
680
        if self.stat_new.has_key(index):
681
            del self.stat_new[index]
682
 
683
        if self.inactive_requests[index] or self.numactive[index]:
684
            return True
685
 
686
        del self.dirty[index]
687
        if not self._flush_buffer(index):
688
            return True
689
        length = self._piecelen(index)
690
        data = self.read_raw(self.places[index], 0, length,
691
                                 flush_first = self.triple_check)
692
        if data is None:
693
            return True
694
        hash = sha(data[:]).digest()
695
        data.release()
696
        if hash != self.hashes[index]:
697
 
698
            self.amount_obtained -= length
699
            self.data_flunked(length, index)
700
            self.inactive_requests[index] = 1
701
            self.amount_inactive += length
702
            self.stat_numflunked += 1
703
 
704
            self.failed_pieces[index] = {}
705
            allsenders = {}
706
            for d in self.download_history[index].values():
707
                allsenders[d] = 1
708
            if len(allsenders) == 1:
709
                culprit = allsenders.keys()[0]
710
                if culprit is not None:
711
                    culprit.failed(index, bump = True)
712
                del self.failed_pieces[index] # found the culprit already
713
 
714
            return False
715
 
716
        self.have[index] = True
717
        self.inactive_requests[index] = None
718
        self.waschecked[index] = True
719
        self.amount_left -= length
720
        self.stat_numdownloaded += 1
721
 
722
        for d in self.download_history[index].values():
723
            if d is not None:
724
                d.good(index)
725
        del self.download_history[index]
726
        if self.failed_pieces.has_key(index):
727
            for d in self.failed_pieces[index].keys():
728
                if d is not None:
729
                    d.failed(index)
730
            del self.failed_pieces[index]
731
 
732
        if self.amount_left == 0:
733
            self.finished()
734
        return True
735
 
736
 
737
    def request_lost(self, index, begin, length):
738
        assert not (begin, length) in self.inactive_requests[index]
739
        insort(self.inactive_requests[index], (begin, length))
740
        self.amount_inactive += length
741
        self.numactive[index] -= 1
742
        if not self.numactive[index]:
743
            del self.stat_active[index]
744
            if self.stat_new.has_key(index):
745
                del self.stat_new[index]
746
 
747
 
748
    def get_piece(self, index, begin, length):
749
        if not self.have[index]:
750
            return None
751
        data = None
752
        if not self.waschecked[index]:
753
            data = self.read_raw(self.places[index], 0, self._piecelen(index))
754
            if data is None:
755
                return None
756
            if sha(data[:]).digest() != self.hashes[index]:
757
                self.failed('told file complete on start-up, but piece failed hash check')
758
                return None
759
            self.waschecked[index] = True
760
            if length == -1 and begin == 0:
761
                return data     # optimization
762
        if length == -1:
763
            if begin > self._piecelen(index):
764
                return None
765
            length = self._piecelen(index)-begin
766
            if begin == 0:
767
                return self.read_raw(self.places[index], 0, length)
768
        elif begin + length > self._piecelen(index):
769
            return None
770
        if data is not None:
771
            s = data[begin:begin+length]
772
            data.release()
773
            return s
774
        data = self.read_raw(self.places[index], begin, length)
775
        if data is None:
776
            return None
777
        s = data.getarray()
778
        data.release()
779
        return s
780
 
781
    def read_raw(self, piece, begin, length, flush_first = False):
782
        try:
783
            return self.storage.read(self.piece_size * piece + begin,
784
                                                     length, flush_first)
785
        except IOError, e:
786
            self.failed('IO Error: ' + str(e))
787
            return None
788
 
789
 
790
    def set_file_readonly(self, n):
791
        try:
792
            self.storage.set_readonly(n)
793
        except IOError, e:
794
            self.failed('IO Error: ' + str(e))
795
        except OSError, e:
796
            self.failed('OS Error: ' + str(e))
797
 
798
 
799
    def has_data(self, index):
800
        return index not in self.holes and index not in self.blocked_holes
801
 
802
    def doublecheck_data(self, pieces_to_check):
803
        if not self.double_check:
804
            return
805
        sources = []
806
        for p,v in self.places.items():
807
            if pieces_to_check.has_key(v):
808
                sources.append(p)
809
        assert len(sources) == len(pieces_to_check)
810
        sources.sort()
811
        for index in sources:
812
            if self.have[index]:
813
                piece = self.read_raw(self.places[index],0,self._piecelen(index),
814
                                       flush_first = True )
815
                if piece is None:
816
                    return False
817
                if sha(piece[:]).digest() != self.hashes[index]:
818
                    self.failed('download corrupted; please restart and resume')
819
                    return False
820
                piece.release()
821
        return True
822
 
823
 
824
    def reblock(self, new_blocked):
825
        # assume downloads have already been canceled and chunks made inactive
826
        for i in xrange(len(new_blocked)):
827
            if new_blocked[i] and not self.blocked[i]:
828
                length = self._piecelen(i)
829
                self.amount_desired -= length
830
                if self.have[i]:
831
                    self.amount_obtained -= length
832
                    continue
833
                if self.inactive_requests[i] == 1:
834
                    self.amount_inactive -= length
835
                    continue
836
                inactive = 0
837
                for nb, nl in self.inactive_requests[i]:
838
                    inactive += nl
839
                self.amount_inactive -= inactive
840
                self.amount_obtained -= length - inactive
841
 
842
            if self.blocked[i] and not new_blocked[i]:
843
                length = self._piecelen(i)
844
                self.amount_desired += length
845
                if self.have[i]:
846
                    self.amount_obtained += length
847
                    continue
848
                if self.inactive_requests[i] == 1:
849
                    self.amount_inactive += length
850
                    continue
851
                inactive = 0
852
                for nb, nl in self.inactive_requests[i]:
853
                    inactive += nl
854
                self.amount_inactive += inactive
855
                self.amount_obtained += length - inactive
856
 
857
        self.blocked = new_blocked
858
 
859
        self.blocked_movein = Olist()
860
        self.blocked_moveout = Olist()
861
        for p,v in self.places.items():
862
            if p != v:
863
                if self.blocked[p] and not self.blocked[v]:
864
                    self.blocked_movein.add(p)
865
                elif self.blocked[v] and not self.blocked[p]:
866
                    self.blocked_moveout.add(p)
867
 
868
        self.holes.extend(self.blocked_holes)    # reset holes list
869
        self.holes.sort()
870
        self.blocked_holes = []
871
 
872
 
873
    '''
874
    Pickled data format:
875
 
876
    d['pieces'] = either a string containing a bitfield of complete pieces,
877
                    or the numeric value "1" signifying a seed.  If it is
878
                    a seed, d['places'] and d['partials'] should be empty
879
                    and needn't even exist.
880
    d['partials'] = [ piece, [ offset, length... ]... ]
881
                    a list of partial data that had been previously
882
                    downloaded, plus the given offsets.  Adjacent partials
883
                    are merged so as to save space, and so that if the
884
                    request size changes then new requests can be
885
                    calculated more efficiently.
886
    d['places'] = [ piece, place, {,piece, place ...} ]
887
                    the piece index, and the place it's stored.
888
                    If d['pieces'] specifies a complete piece or d['partials']
889
                    specifies a set of partials for a piece which has no
890
                    entry in d['places'], it can be assumed that
891
                    place[index] = index.  A place specified with no
892
                    corresponding data in d['pieces'] or d['partials']
893
                    indicates allocated space with no valid data, and is
894
                    reserved so it doesn't need to be hash-checked.
895
    '''
896
    def pickle(self):
897
        if self.have.complete():
898
            return {'pieces': 1}
899
        pieces = Bitfield(len(self.hashes))
900
        places = []
901
        partials = []
902
        for p in xrange(len(self.hashes)):
903
            if self.blocked[p] or not self.places.has_key(p):
904
                continue
905
            h = self.have[p]
906
            pieces[p] = h
907
            pp = self.dirty.get(p)
908
            if not h and not pp:  # no data
909
                places.extend([self.places[p],self.places[p]])
910
            elif self.places[p] != p:
911
                places.extend([p, self.places[p]])
912
            if h or not pp:
913
                continue
914
            pp.sort()
915
            r = []
916
            while len(pp) > 1:
917
                if pp[0][0]+pp[0][1] == pp[1][0]:
918
                    pp[0] = list(pp[0])
919
                    pp[0][1] += pp[1][1]
920
                    del pp[1]
921
                else:
922
                    r.extend(pp[0])
923
                    del pp[0]
924
            r.extend(pp[0])
925
            partials.extend([p,r])
926
        return {'pieces': pieces.tostring(), 'places': places, 'partials': partials}
927
 
928
 
929
    def unpickle(self, data, valid_places):
930
        got = {}
931
        places = {}
932
        dirty = {}
933
        download_history = {}
934
        stat_active = {}
935
        stat_numfound = self.stat_numfound
936
        amount_obtained = self.amount_obtained
937
        amount_inactive = self.amount_inactive
938
        amount_left = self.amount_left
939
        inactive_requests = [x for x in self.inactive_requests]
940
        restored_partials = []
941
 
942
        try:
943
            if data['pieces'] == 1:     # a seed
944
                assert not data.get('places',None)
945
                assert not data.get('partials',None)
946
                have = Bitfield(len(self.hashes))
947
                for i in xrange(len(self.hashes)):
948
                    have[i] = True
949
                assert have.complete()
950
                _places = []
951
                _partials = []
952
            else:
953
                have = Bitfield(len(self.hashes), data['pieces'])
954
                _places = data['places']
955
                assert len(_places) % 2 == 0
956
                _places = [_places[x:x+2] for x in xrange(0,len(_places),2)]
957
                _partials = data['partials']
958
                assert len(_partials) % 2 == 0
959
                _partials = [_partials[x:x+2] for x in xrange(0,len(_partials),2)]
960
 
961
            for index, place in _places:
962
                if place not in valid_places:
963
                    continue
964
                assert not got.has_key(index)
965
                assert not got.has_key(place)
966
                places[index] = place
967
                got[index] = 1
968
                got[place] = 1
969
 
970
            for index in xrange(len(self.hashes)):
971
                if have[index]:
972
                    if not places.has_key(index):
973
                        if index not in valid_places:
974
                            have[index] = False
975
                            continue
976
                        assert not got.has_key(index)
977
                        places[index] = index
978
                        got[index] = 1
979
                    length = self._piecelen(index)
980
                    amount_obtained += length
981
                    stat_numfound += 1
982
                    amount_inactive -= length
983
                    amount_left -= length
984
                    inactive_requests[index] = None
985
 
986
            for index, plist in _partials:
987
                assert not dirty.has_key(index)
988
                assert not have[index]
989
                if not places.has_key(index):
990
                    if index not in valid_places:
991
                        continue
992
                    assert not got.has_key(index)
993
                    places[index] = index
994
                    got[index] = 1
995
                assert len(plist) % 2 == 0
996
                plist = [plist[x:x+2] for x in xrange(0,len(plist),2)]
997
                dirty[index] = plist
998
                stat_active[index] = 1
999
                download_history[index] = {}
1000
                # invert given partials
1001
                length = self._piecelen(index)
1002
                l = []
1003
                if plist[0][0] > 0:
1004
                    l.append((0,plist[0][0]))
1005
                for i in xrange(len(plist)-1):
1006
                    end = plist[i][0]+plist[i][1]
1007
                    assert not end > plist[i+1][0]
1008
                    l.append((end,plist[i+1][0]-end))
1009
                end = plist[-1][0]+plist[-1][1]
1010
                assert not end > length
1011
                if end < length:
1012
                    l.append((end,length-end))
1013
                # split them to request_size
1014
                ll = []
1015
                amount_obtained += length
1016
                amount_inactive -= length
1017
                for nb, nl in l:
1018
                    while nl > 0:
1019
                        r = min(nl,self.request_size)
1020
                        ll.append((nb,r))
1021
                        amount_inactive += r
1022
                        amount_obtained -= r
1023
                        nb += self.request_size
1024
                        nl -= self.request_size
1025
                inactive_requests[index] = ll
1026
                restored_partials.append(index)
1027
 
1028
            assert amount_obtained + amount_inactive == self.amount_desired
1029
        except:
1030
#            print_exc()
1031
            return []   # invalid data, discard everything
1032
 
1033
        self.have = have
1034
        self.places = places
1035
        self.dirty = dirty
1036
        self.download_history = download_history
1037
        self.stat_active = stat_active
1038
        self.stat_numfound = stat_numfound
1039
        self.amount_obtained = amount_obtained
1040
        self.amount_inactive = amount_inactive
1041
        self.amount_left = amount_left
1042
        self.inactive_requests = inactive_requests
1043
 
1044
        return restored_partials
1045