Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
#!/usr/bin/env python
2
 
3
# Written by John Hoffman
4
# see LICENSE.txt for license information
5
 
6
from BitTornado import PSYCO
7
if PSYCO.psyco:
8
    try:
9
        import psyco
10
        assert psyco.__version__ >= 0x010100f0
11
        psyco.full()
12
    except:
13
        pass
14
 
15
from download_bt1 import BT1Download
16
from RawServer import RawServer, UPnP_ERROR
17
from RateLimiter import RateLimiter
18
from ServerPortHandler import MultiHandler
19
from parsedir import parsedir
20
from natpunch import UPnP_test
21
from random import seed
22
from socket import error as socketerror
23
from threading import Event
24
from sys import argv, exit
25
import sys, os
26
from clock import clock
27
from __init__ import createPeerID, mapbase64, version
28
from cStringIO import StringIO
29
from traceback import print_exc
30
 
31
try:
32
    True
33
except:
34
    True = 1
35
    False = 0
36
 
37
 
38
def fmttime(n):
39
    try:
40
        n = int(n)  # n may be None or too large
41
        assert n < 5184000  # 60 days
42
    except:
43
        return 'downloading'
44
    m, s = divmod(n, 60)
45
    h, m = divmod(m, 60)
46
    return '%d:%02d:%02d' % (h, m, s)
47
 
48
class SingleDownload:
49
    def __init__(self, controller, hash, response, config, myid):
50
        self.controller = controller
51
        self.hash = hash
52
        self.response = response
53
        self.config = config
54
 
55
        self.doneflag = Event()
56
        self.waiting = True
57
        self.checking = False
58
        self.working = False
59
        self.seed = False
60
        self.closed = False
61
 
62
        self.status_msg = ''
63
        self.status_err = ['']
64
        self.status_errtime = 0
65
        self.status_done = 0.0
66
 
67
        self.rawserver = controller.handler.newRawServer(hash, self.doneflag)
68
 
69
        d = BT1Download(self.display, self.finished, self.error,
70
                        controller.exchandler, self.doneflag, config, response,
71
                        hash, myid, self.rawserver, controller.listen_port)
72
        self.d = d
73
 
74
    def start(self):
75
        if not self.d.saveAs(self.saveAs):
76
            self._shutdown()
77
            return
78
        self._hashcheckfunc = self.d.initFiles()
79
        if not self._hashcheckfunc:
80
            self._shutdown()
81
            return
82
        self.controller.hashchecksched(self.hash)
83
 
84
 
85
    def saveAs(self, name, length, saveas, isdir):
86
        return self.controller.saveAs(self.hash, name, saveas, isdir)
87
 
88
    def hashcheck_start(self, donefunc):
89
        if self.is_dead():
90
            self._shutdown()
91
            return
92
        self.waiting = False
93
        self.checking = True
94
        self._hashcheckfunc(donefunc)
95
 
96
    def hashcheck_callback(self):
97
        self.checking = False
98
        if self.is_dead():
99
            self._shutdown()
100
            return
101
        if not self.d.startEngine(ratelimiter = self.controller.ratelimiter):
102
            self._shutdown()
103
            return
104
        self.d.startRerequester()
105
        self.statsfunc = self.d.startStats()
106
        self.rawserver.start_listening(self.d.getPortHandler())
107
        self.working = True
108
 
109
    def is_dead(self):
110
        return self.doneflag.isSet()
111
 
112
    def _shutdown(self):
113
        self.shutdown(False)
114
 
115
    def shutdown(self, quiet=True):
116
        if self.closed:
117
            return
118
        self.doneflag.set()
119
        self.rawserver.shutdown()
120
        if self.checking or self.working:
121
            self.d.shutdown()
122
        self.waiting = False
123
        self.checking = False
124
        self.working = False
125
        self.closed = True
126
        self.controller.was_stopped(self.hash)
127
        if not quiet:
128
            self.controller.died(self.hash)
129
 
130
 
131
    def display(self, activity = None, fractionDone = None):
132
        # really only used by StorageWrapper now
133
        if activity:
134
            self.status_msg = activity
135
        if fractionDone is not None:
136
            self.status_done = float(fractionDone)
137
 
138
    def finished(self):
139
        self.seed = True
140
 
141
    def error(self, msg):
142
        if self.doneflag.isSet():
143
            self._shutdown()
144
        self.status_err.append(msg)
145
        self.status_errtime = clock()
146
 
147
 
148
class LaunchMany:
149
    def __init__(self, config, Output):
150
        try:
151
            self.config = config
152
            self.Output = Output
153
 
154
            self.torrent_dir = config['torrent_dir']
155
            self.torrent_cache = {}
156
            self.file_cache = {}
157
            self.blocked_files = {}
158
            self.scan_period = config['parse_dir_interval']
159
            self.stats_period = config['display_interval']
160
 
161
            self.torrent_list = []
162
            self.downloads = {}
163
            self.counter = 0
164
            self.doneflag = Event()
165
 
166
            self.hashcheck_queue = []
167
            self.hashcheck_current = None
168
 
169
            self.rawserver = RawServer(self.doneflag, config['timeout_check_interval'],
170
                              config['timeout'], ipv6_enable = config['ipv6_enabled'],
171
                              failfunc = self.failed, errorfunc = self.exchandler)
172
            upnp_type = UPnP_test(config['upnp_nat_access'])
173
            while True:
174
                try:
175
                    self.listen_port = self.rawserver.find_and_bind(
176
                                    config['minport'], config['maxport'], config['bind'],
177
                                    ipv6_socket_style = config['ipv6_binds_v4'],
178
                                    upnp = upnp_type, randomizer = config['random_port'])
179
                    break
180
                except socketerror, e:
181
                    if upnp_type and e == UPnP_ERROR:
182
                        self.Output.message('WARNING: COULD NOT FORWARD VIA UPnP')
183
                        upnp_type = 0
184
                        continue
185
                    self.failed("Couldn't listen - " + str(e))
186
                    return
187
 
188
            self.ratelimiter = RateLimiter(self.rawserver.add_task,
189
                                           config['upload_unit_size'])
190
            self.ratelimiter.set_upload_rate(config['max_upload_rate'])
191
 
192
            self.handler = MultiHandler(self.rawserver, self.doneflag)
193
            seed(createPeerID())
194
            self.rawserver.add_task(self.scan, 0)
195
            self.rawserver.add_task(self.stats, 0)
196
 
197
            self.handler.listen_forever()
198
 
199
            self.Output.message('shutting down')
200
            self.hashcheck_queue = []
201
            for hash in self.torrent_list:
202
                self.Output.message('dropped "'+self.torrent_cache[hash]['path']+'"')
203
                self.downloads[hash].shutdown()
204
            self.rawserver.shutdown()
205
 
206
        except:
207
            data = StringIO()
208
            print_exc(file = data)
209
            Output.exception(data.getvalue())
210
 
211
 
212
    def scan(self):
213
        self.rawserver.add_task(self.scan, self.scan_period)
214
 
215
        r = parsedir(self.torrent_dir, self.torrent_cache,
216
                     self.file_cache, self.blocked_files,
217
                     return_metainfo = True, errfunc = self.Output.message)
218
 
219
        ( self.torrent_cache, self.file_cache, self.blocked_files,
220
            added, removed ) = r
221
 
222
        for hash, data in removed.items():
223
            self.Output.message('dropped "'+data['path']+'"')
224
            self.remove(hash)
225
        for hash, data in added.items():
226
            self.Output.message('added "'+data['path']+'"')
227
            self.add(hash, data)
228
 
229
    def stats(self):            
230
        self.rawserver.add_task(self.stats, self.stats_period)
231
        data = []
232
        for hash in self.torrent_list:
233
            cache = self.torrent_cache[hash]
234
            if self.config['display_path']:
235
                name = cache['path']
236
            else:
237
                name = cache['name']
238
            size = cache['length']
239
            d = self.downloads[hash]
240
            progress = '0.0%'
241
            peers = 0
242
            seeds = 0
243
            seedsmsg = "S"
244
            dist = 0.0
245
            uprate = 0.0
246
            dnrate = 0.0
247
            upamt = 0
248
            dnamt = 0
249
            t = 0
250
            if d.is_dead():
251
                status = 'stopped'
252
            elif d.waiting:
253
                status = 'waiting for hash check'
254
            elif d.checking:
255
                status = d.status_msg
256
                progress = '%.1f%%' % (d.status_done*100)
257
            else:
258
                stats = d.statsfunc()
259
                s = stats['stats']
260
                if d.seed:
261
                    status = 'seeding'
262
                    progress = '100.0%'
263
                    seeds = s.numOldSeeds
264
                    seedsmsg = "s"
265
                    dist = s.numCopies
266
                else:
267
                    if s.numSeeds + s.numPeers:
268
                        t = stats['time']
269
                        if t == 0:  # unlikely
270
                            t = 0.01
271
                        status = fmttime(t)
272
                    else:
273
                        t = -1
274
                        status = 'connecting to peers'
275
                    progress = '%.1f%%' % (int(stats['frac']*1000)/10.0)
276
                    seeds = s.numSeeds
277
                    dist = s.numCopies2
278
                    dnrate = stats['down']
279
                peers = s.numPeers
280
                uprate = stats['up']
281
                upamt = s.upTotal
282
                dnamt = s.downTotal
283
 
284
            if d.is_dead() or d.status_errtime+300 > clock():
285
                msg = d.status_err[-1]
286
            else:
287
                msg = ''
288
 
289
            data.append(( name, status, progress, peers, seeds, seedsmsg, dist,
290
                          uprate, dnrate, upamt, dnamt, size, t, msg ))
291
        stop = self.Output.display(data)
292
        if stop:
293
            self.doneflag.set()
294
 
295
    def remove(self, hash):
296
        self.torrent_list.remove(hash)
297
        self.downloads[hash].shutdown()
298
        del self.downloads[hash]
299
 
300
    def add(self, hash, data):
301
        c = self.counter
302
        self.counter += 1
303
        x = ''
304
        for i in xrange(3):
305
            x = mapbase64[c & 0x3F]+x
306
            c >>= 6
307
        peer_id = createPeerID(x)
308
        d = SingleDownload(self, hash, data['metainfo'], self.config, peer_id)
309
        self.torrent_list.append(hash)
310
        self.downloads[hash] = d
311
        d.start()
312
 
313
 
314
    def saveAs(self, hash, name, saveas, isdir):
315
        x = self.torrent_cache[hash]
316
        style = self.config['saveas_style']
317
        if style == 1 or style == 3:
318
            if saveas:
319
                saveas = os.path.join(saveas,x['file'][:-1-len(x['type'])])
320
            else:
321
                saveas = x['path'][:-1-len(x['type'])]
322
            if style == 3:
323
                if not os.path.isdir(saveas):
324
                    try:
325
                        os.mkdir(saveas)
326
                    except:
327
                        raise OSError("couldn't create directory for "+x['path']
328
                                      +" ("+saveas+")")
329
                if not isdir:
330
                    saveas = os.path.join(saveas, name)
331
        else:
332
            if saveas:
333
                saveas = os.path.join(saveas, name)
334
            else:
335
                saveas = os.path.join(os.path.split(x['path'])[0], name)
336
 
337
        if isdir and not os.path.isdir(saveas):
338
            try:
339
                os.mkdir(saveas)
340
            except:
341
                raise OSError("couldn't create directory for "+x['path']
342
                                      +" ("+saveas+")")
343
        return saveas
344
 
345
 
346
    def hashchecksched(self, hash = None):
347
        if hash:
348
            self.hashcheck_queue.append(hash)
349
        if not self.hashcheck_current:
350
            self._hashcheck_start()
351
 
352
    def _hashcheck_start(self):
353
        self.hashcheck_current = self.hashcheck_queue.pop(0)
354
        self.downloads[self.hashcheck_current].hashcheck_start(self.hashcheck_callback)
355
 
356
    def hashcheck_callback(self):
357
        self.downloads[self.hashcheck_current].hashcheck_callback()
358
        if self.hashcheck_queue:
359
            self._hashcheck_start()
360
        else:
361
            self.hashcheck_current = None
362
 
363
    def died(self, hash):
364
        if self.torrent_cache.has_key(hash):
365
            self.Output.message('DIED: "'+self.torrent_cache[hash]['path']+'"')
366
 
367
    def was_stopped(self, hash):
368
        try:
369
            self.hashcheck_queue.remove(hash)
370
        except:
371
            pass
372
        if self.hashcheck_current == hash:
373
            self.hashcheck_current = None
374
            if self.hashcheck_queue:
375
                self._hashcheck_start()
376
 
377
    def failed(self, s):
378
        self.Output.message('FAILURE: '+s)
379
 
380
    def exchandler(self, s):
381
        self.Output.exception(s)