Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
# Written by John Hoffman
2
# see LICENSE.txt for license information
3
 
4
from cStringIO import StringIO
5
#from RawServer import RawServer
6
try:
7
    True
8
except:
9
    True = 1
10
    False = 0
11
 
12
from BT1.Encrypter import protocol_name
13
 
14
default_task_id = []
15
 
16
class SingleRawServer:
17
    def __init__(self, info_hash, multihandler, doneflag, protocol):
18
        self.info_hash = info_hash
19
        self.doneflag = doneflag
20
        self.protocol = protocol
21
        self.multihandler = multihandler
22
        self.rawserver = multihandler.rawserver
23
        self.finished = False
24
        self.running = False
25
        self.handler = None
26
        self.taskqueue = []
27
 
28
    def shutdown(self):
29
        if not self.finished:
30
            self.multihandler.shutdown_torrent(self.info_hash)
31
 
32
    def _shutdown(self):
33
        if not self.finished:
34
            self.finished = True
35
            self.running = False
36
            self.rawserver.kill_tasks(self.info_hash)
37
            if self.handler:
38
                self.handler.close_all()
39
 
40
    def _external_connection_made(self, c, options, already_read):
41
        if self.running:
42
            c.set_handler(self.handler)
43
            self.handler.externally_handshaked_connection_made(
44
                c, options, already_read)
45
 
46
    ### RawServer functions ###
47
 
48
    def add_task(self, func, delay=0, id = default_task_id):
49
        if id is default_task_id:
50
            id = self.info_hash
51
        if not self.finished:
52
            self.rawserver.add_task(func, delay, id)
53
 
54
#    def bind(self, port, bind = '', reuse = False):
55
#        pass    # not handled here
56
 
57
    def start_connection(self, dns, handler = None):
58
        if not handler:
59
            handler = self.handler
60
        c = self.rawserver.start_connection(dns, handler)
61
        return c
62
 
63
#    def listen_forever(self, handler):
64
#        pass    # don't call with this
65
 
66
    def start_listening(self, handler):
67
        self.handler = handler
68
        self.running = True
69
        return self.shutdown    # obviously, doesn't listen forever
70
 
71
    def is_finished(self):
72
        return self.finished
73
 
74
    def get_exception_flag(self):
75
        return self.rawserver.get_exception_flag()
76
 
77
 
78
class NewSocketHandler:     # hand a new socket off where it belongs
79
    def __init__(self, multihandler, connection):
80
        self.multihandler = multihandler
81
        self.connection = connection
82
        connection.set_handler(self)
83
        self.closed = False
84
        self.buffer = StringIO()
85
        self.complete = False
86
        self.next_len, self.next_func = 1, self.read_header_len
87
        self.multihandler.rawserver.add_task(self._auto_close, 15)
88
 
89
    def _auto_close(self):
90
        if not self.complete:
91
            self.close()
92
 
93
    def close(self):
94
        if not self.closed:
95
            self.connection.close()
96
            self.closed = True
97
 
98
 
99
#   header format:
100
#        connection.write(chr(len(protocol_name)) + protocol_name + 
101
#            (chr(0) * 8) + self.encrypter.download_id + self.encrypter.my_id)
102
 
103
    # copied from Encrypter and modified
104
 
105
    def read_header_len(self, s):
106
        l = ord(s)
107
        return l, self.read_header
108
 
109
    def read_header(self, s):
110
        self.protocol = s
111
        return 8, self.read_reserved
112
 
113
    def read_reserved(self, s):
114
        self.options = s
115
        return 20, self.read_download_id
116
 
117
    def read_download_id(self, s):
118
        if self.multihandler.singlerawservers.has_key(s):
119
            if self.multihandler.singlerawservers[s].protocol == self.protocol:
120
                return True
121
        return None
122
 
123
    def read_dead(self, s):
124
        return None
125
 
126
    def data_came_in(self, garbage, s):
127
        while True:
128
            if self.closed:
129
                return
130
            i = self.next_len - self.buffer.tell()
131
            if i > len(s):
132
                self.buffer.write(s)
133
                return
134
            self.buffer.write(s[:i])
135
            s = s[i:]
136
            m = self.buffer.getvalue()
137
            self.buffer.reset()
138
            self.buffer.truncate()
139
            try:
140
                x = self.next_func(m)
141
            except:
142
                self.next_len, self.next_func = 1, self.read_dead
143
                raise
144
            if x is None:
145
                self.close()
146
                return
147
            if x == True:       # ready to process
148
                self.multihandler.singlerawservers[m]._external_connection_made(
149
                        self.connection, self.options, s)
150
                self.complete = True
151
                return
152
            self.next_len, self.next_func = x
153
 
154
    def connection_flushed(self, ss):
155
        pass
156
 
157
    def connection_lost(self, ss):
158
        self.closed = True
159
 
160
class MultiHandler:
161
    def __init__(self, rawserver, doneflag):
162
        self.rawserver = rawserver
163
        self.masterdoneflag = doneflag
164
        self.singlerawservers = {}
165
        self.connections = {}
166
        self.taskqueues = {}
167
 
168
    def newRawServer(self, info_hash, doneflag, protocol=protocol_name):
169
        new = SingleRawServer(info_hash, self, doneflag, protocol)
170
        self.singlerawservers[info_hash] = new
171
        return new
172
 
173
    def shutdown_torrent(self, info_hash):
174
        self.singlerawservers[info_hash]._shutdown()
175
        del self.singlerawservers[info_hash]
176
 
177
    def listen_forever(self):
178
        self.rawserver.listen_forever(self)
179
        for srs in self.singlerawservers.values():
180
            srs.finished = True
181
            srs.running = False
182
            srs.doneflag.set()
183
 
184
    ### RawServer handler functions ###
185
    # be wary of name collisions
186
 
187
    def external_connection_made(self, ss):
188
        NewSocketHandler(self, ss)