Subversion Repositories svnkaklik

Rev

Details | Last modification | View Log

Rev Author Line No. Line
36 kaklik 1
# Written by John Hoffman
2
# derived from NATPortMapping.py by Yejun Yang
3
# and from example code by Myers Carpenter
4
# see LICENSE.txt for license information
5
 
6
import socket
7
from traceback import print_exc
8
from subnetparse import IP_List
9
from clock import clock
10
from __init__ import createPeerID
11
try:
12
    True
13
except:
14
    True = 1
15
    False = 0
16
 
17
DEBUG = False
18
 
19
EXPIRE_CACHE = 30 # seconds
20
ID = "BT-"+createPeerID()[-4:]
21
 
22
try:
23
    import pythoncom, win32com.client
24
    _supported = 1
25
except ImportError:
26
    _supported = 0
27
 
28
 
29
 
30
class _UPnP1:   # derived from Myers Carpenter's code
31
                # seems to use the machine's local UPnP
32
                # system for its operation.  Runs fairly fast
33
 
34
    def __init__(self):
35
        self.map = None
36
        self.last_got_map = -10e10
37
 
38
    def _get_map(self):
39
        if self.last_got_map + EXPIRE_CACHE < clock():
40
            try:
41
                dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP")
42
                self.map = dispatcher.StaticPortMappingCollection
43
                self.last_got_map = clock()
44
            except:
45
                self.map = None
46
        return self.map
47
 
48
    def test(self):
49
        try:
50
            assert self._get_map()     # make sure a map was found
51
            success = True
52
        except:
53
            success = False
54
        return success
55
 
56
 
57
    def open(self, ip, p):
58
        map = self._get_map()
59
        try:
60
            map.Add(p,'TCP',p,ip,True,ID)
61
            if DEBUG:
62
                print 'port opened: '+ip+':'+str(p)
63
            success = True
64
        except:
65
            if DEBUG:
66
                print "COULDN'T OPEN "+str(p)
67
                print_exc()
68
            success = False
69
        return success
70
 
71
 
72
    def close(self, p):
73
        map = self._get_map()
74
        try:
75
            map.Remove(p,'TCP')
76
            success = True
77
            if DEBUG:
78
                print 'port closed: '+str(p)
79
        except:
80
            if DEBUG:
81
                print 'ERROR CLOSING '+str(p)
82
                print_exc()
83
            success = False
84
        return success
85
 
86
 
87
    def clean(self, retry = False):
88
        if not _supported:
89
            return
90
        try:
91
            map = self._get_map()
92
            ports_in_use = []
93
            for i in xrange(len(map)):
94
                try:
95
                    mapping = map[i]
96
                    port = mapping.ExternalPort
97
                    prot = str(mapping.Protocol).lower()
98
                    desc = str(mapping.Description).lower()
99
                except:
100
                    port = None
101
                if port and prot == 'tcp' and desc[:3] == 'bt-':
102
                    ports_in_use.append(port)
103
            success = True
104
            for port in ports_in_use:
105
                try:
106
                    map.Remove(port,'TCP')
107
                except:
108
                    success = False
109
            if not success and not retry:
110
                self.clean(retry = True)
111
        except:
112
            pass
113
 
114
 
115
class _UPnP2:   # derived from Yejun Yang's code
116
                # apparently does a direct search for UPnP hardware
117
                # may work in some cases where _UPnP1 won't, but is slow
118
                # still need to implement "clean" method
119
 
120
    def __init__(self):
121
        self.services = None
122
        self.last_got_services = -10e10
123
 
124
    def _get_services(self):
125
        if not self.services or self.last_got_services + EXPIRE_CACHE < clock():
126
            self.services = []
127
            try:
128
                f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder")
129
                for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1",
130
                           "urn:schemas-upnp-org:service:WANPPPConnection:1" ):
131
                    try:
132
                        conns = f.FindByType(t,0)
133
                        for c in xrange(len(conns)):
134
                            try:
135
                                svcs = conns[c].Services
136
                                for s in xrange(len(svcs)):
137
                                    try:
138
                                        self.services.append(svcs[s])
139
                                    except:
140
                                        pass
141
                            except:
142
                                pass
143
                    except:
144
                        pass
145
            except:
146
                pass
147
            self.last_got_services = clock()
148
        return self.services
149
 
150
    def test(self):
151
        try:
152
            assert self._get_services()    # make sure some services can be found
153
            success = True
154
        except:
155
            success = False
156
        return success
157
 
158
 
159
    def open(self, ip, p):
160
        svcs = self._get_services()
161
        success = False
162
        for s in svcs:
163
            try:
164
                s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'')
165
                success = True
166
            except:
167
                pass
168
        if DEBUG and not success:
169
            print "COULDN'T OPEN "+str(p)
170
            print_exc()
171
        return success
172
 
173
 
174
    def close(self, p):
175
        svcs = self._get_services()
176
        success = False
177
        for s in svcs:
178
            try:
179
                s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '')
180
                success = True
181
            except:
182
                pass
183
        if DEBUG and not success:
184
            print "COULDN'T OPEN "+str(p)
185
            print_exc()
186
        return success
187
 
188
 
189
class _UPnP:    # master holding class
190
    def __init__(self):
191
        self.upnp1 = _UPnP1()
192
        self.upnp2 = _UPnP2()
193
        self.upnplist = (None, self.upnp1, self.upnp2)
194
        self.upnp = None
195
        self.local_ip = None
196
        self.last_got_ip = -10e10
197
 
198
    def get_ip(self):
199
        if self.last_got_ip + EXPIRE_CACHE < clock():
200
            local_ips = IP_List()
201
            local_ips.set_intranet_addresses()
202
            try:
203
                for info in socket.getaddrinfo(socket.gethostname(),0,socket.AF_INET):
204
                            # exception if socket library isn't recent
205
                    self.local_ip = info[4][0]
206
                    if local_ips.includes(self.local_ip):
207
                        self.last_got_ip = clock()
208
                        if DEBUG:
209
                            print 'Local IP found: '+self.local_ip
210
                        break
211
                else:
212
                    raise ValueError('couldn\'t find intranet IP')
213
            except:
214
                self.local_ip = None
215
                if DEBUG:
216
                    print 'Error finding local IP'
217
                    print_exc()
218
        return self.local_ip
219
 
220
    def test(self, upnp_type):
221
        if DEBUG:
222
            print 'testing UPnP type '+str(upnp_type)
223
        if not upnp_type or not _supported or self.get_ip() is None:
224
            if DEBUG:
225
                print 'not supported'
226
            return 0
227
        pythoncom.CoInitialize()                # leave initialized
228
        self.upnp = self.upnplist[upnp_type]    # cache this
229
        if self.upnp.test():
230
            if DEBUG:
231
                print 'ok'
232
            return upnp_type
233
        if DEBUG:
234
            print 'tested bad'
235
        return 0
236
 
237
    def open(self, p):
238
        assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
239
        return self.upnp.open(self.get_ip(), p)
240
 
241
    def close(self, p):
242
        assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"
243
        return self.upnp.close(p)
244
 
245
    def clean(self):
246
        return self.upnp1.clean()
247
 
248
_upnp_ = _UPnP()
249
 
250
UPnP_test = _upnp_.test
251
UPnP_open_port = _upnp_.open
252
UPnP_close_port = _upnp_.close
253
UPnP_reset = _upnp_.clean
254