Blame | Last modification | View Log | Download
# Written by John Hoffman# derived from NATPortMapping.py by Yejun Yang# and from example code by Myers Carpenter# see LICENSE.txt for license informationimport socketfrom traceback import print_excfrom subnetparse import IP_Listfrom clock import clockfrom __init__ import createPeerIDtry:Trueexcept:True = 1False = 0DEBUG = FalseEXPIRE_CACHE = 30 # secondsID = "BT-"+createPeerID()[-4:]try:import pythoncom, win32com.client_supported = 1except ImportError:_supported = 0class _UPnP1: # derived from Myers Carpenter's code# seems to use the machine's local UPnP# system for its operation. Runs fairly fastdef __init__(self):self.map = Noneself.last_got_map = -10e10def _get_map(self):if self.last_got_map + EXPIRE_CACHE < clock():try:dispatcher = win32com.client.Dispatch("HNetCfg.NATUPnP")self.map = dispatcher.StaticPortMappingCollectionself.last_got_map = clock()except:self.map = Nonereturn self.mapdef test(self):try:assert self._get_map() # make sure a map was foundsuccess = Trueexcept:success = Falsereturn successdef open(self, ip, p):map = self._get_map()try:map.Add(p,'TCP',p,ip,True,ID)if DEBUG:print 'port opened: '+ip+':'+str(p)success = Trueexcept:if DEBUG:print "COULDN'T OPEN "+str(p)print_exc()success = Falsereturn successdef close(self, p):map = self._get_map()try:map.Remove(p,'TCP')success = Trueif DEBUG:print 'port closed: '+str(p)except:if DEBUG:print 'ERROR CLOSING '+str(p)print_exc()success = Falsereturn successdef clean(self, retry = False):if not _supported:returntry:map = self._get_map()ports_in_use = []for i in xrange(len(map)):try:mapping = map[i]port = mapping.ExternalPortprot = str(mapping.Protocol).lower()desc = str(mapping.Description).lower()except:port = Noneif port and prot == 'tcp' and desc[:3] == 'bt-':ports_in_use.append(port)success = Truefor port in ports_in_use:try:map.Remove(port,'TCP')except:success = Falseif not success and not retry:self.clean(retry = True)except:passclass _UPnP2: # derived from Yejun Yang's code# apparently does a direct search for UPnP hardware# may work in some cases where _UPnP1 won't, but is slow# still need to implement "clean" methoddef __init__(self):self.services = Noneself.last_got_services = -10e10def _get_services(self):if not self.services or self.last_got_services + EXPIRE_CACHE < clock():self.services = []try:f=win32com.client.Dispatch("UPnP.UPnPDeviceFinder")for t in ( "urn:schemas-upnp-org:service:WANIPConnection:1","urn:schemas-upnp-org:service:WANPPPConnection:1" ):try:conns = f.FindByType(t,0)for c in xrange(len(conns)):try:svcs = conns[c].Servicesfor s in xrange(len(svcs)):try:self.services.append(svcs[s])except:passexcept:passexcept:passexcept:passself.last_got_services = clock()return self.servicesdef test(self):try:assert self._get_services() # make sure some services can be foundsuccess = Trueexcept:success = Falsereturn successdef open(self, ip, p):svcs = self._get_services()success = Falsefor s in svcs:try:s.InvokeAction('AddPortMapping',['',p,'TCP',p,ip,True,ID,0],'')success = Trueexcept:passif DEBUG and not success:print "COULDN'T OPEN "+str(p)print_exc()return successdef close(self, p):svcs = self._get_services()success = Falsefor s in svcs:try:s.InvokeAction('DeletePortMapping', ['',p,'TCP'], '')success = Trueexcept:passif DEBUG and not success:print "COULDN'T OPEN "+str(p)print_exc()return successclass _UPnP: # master holding classdef __init__(self):self.upnp1 = _UPnP1()self.upnp2 = _UPnP2()self.upnplist = (None, self.upnp1, self.upnp2)self.upnp = Noneself.local_ip = Noneself.last_got_ip = -10e10def get_ip(self):if self.last_got_ip + EXPIRE_CACHE < clock():local_ips = IP_List()local_ips.set_intranet_addresses()try:for info in socket.getaddrinfo(socket.gethostname(),0,socket.AF_INET):# exception if socket library isn't recentself.local_ip = info[4][0]if local_ips.includes(self.local_ip):self.last_got_ip = clock()if DEBUG:print 'Local IP found: '+self.local_ipbreakelse:raise ValueError('couldn\'t find intranet IP')except:self.local_ip = Noneif DEBUG:print 'Error finding local IP'print_exc()return self.local_ipdef test(self, upnp_type):if DEBUG:print 'testing UPnP type '+str(upnp_type)if not upnp_type or not _supported or self.get_ip() is None:if DEBUG:print 'not supported'return 0pythoncom.CoInitialize() # leave initializedself.upnp = self.upnplist[upnp_type] # cache thisif self.upnp.test():if DEBUG:print 'ok'return upnp_typeif DEBUG:print 'tested bad'return 0def open(self, p):assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"return self.upnp.open(self.get_ip(), p)def close(self, p):assert self.upnp, "must run UPnP_test() with the desired UPnP access type first"return self.upnp.close(p)def clean(self):return self.upnp1.clean()_upnp_ = _UPnP()UPnP_test = _upnp_.testUPnP_open_port = _upnp_.openUPnP_close_port = _upnp_.closeUPnP_reset = _upnp_.clean