diff options
Diffstat (limited to 'abs/core/LinHES-config/discover_infinitv.py')
-rwxr-xr-x | abs/core/LinHES-config/discover_infinitv.py | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/abs/core/LinHES-config/discover_infinitv.py b/abs/core/LinHES-config/discover_infinitv.py new file mode 100755 index 0000000..5c39c59 --- /dev/null +++ b/abs/core/LinHES-config/discover_infinitv.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python2 +# Program to detect Ceton InfiniTV tuners on the network +# Program will only listen on a specific interface for a set amount of time. +# If during that time it finds a device it will print out the location. +# discover_infinitv $ip $timeout +# ex: discover_infinitv 192.168.200.2 + +import socket +import thread +import time,sys + + +class find_the_infiniTV(object): + + def __init__(self,UPNP_GROUP="239.255.255.250", UPNP_PORT=1900, interface_ip='127.0.0.1'): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('', UPNP_PORT)) + mreq = socket.inet_aton(UPNP_GROUP)+socket.inet_aton(interface_ip) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + self.sock=sock + self.devices = {} + self.emit_msearch() + + def emit_msearch(self): + msearch_string = 'M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: "ssdp:discover"\r\nMX: 2\r\nST: urn:schemas-upnp-org:device:MediaRenderer:1\r\n\r\n' + msearch_encoded_string = msearch_string.encode() + self.sock.sendto( msearch_encoded_string, ('239.255.255.250', 1900 ) ) + + # Internally used method to read a notification + def _listen_notification(self, data): + upnp_header, upnp_body = data.split("\r\n\r\n") + + # Get header by line -- as fragments + upnp_hfrags = upnp_header.split("\r\n") + upnp_hfrags.pop(0) + + header = {} + + for frag in upnp_hfrags: + splitpoint = frag.find(": ") + header[frag[:splitpoint]] = frag[splitpoint+2:] + + if("USN" in header): + if("CetonFactory" in header["USN"]): + if(header["USN"] not in self.devices): + self.devices[header["USN"]] = header["USN"] + print("Found InfiniTV. Location URL: {0}".format(header["LOCATION"])) + + + def listen(self): + import select + self.listening = True + while self.listening: + upnp_data = self.sock.recv(10240).decode('utf-8') + #Filter by type (we only care about NOTIFY for now) + if(upnp_data[0:6] == "NOTIFY"): + self._listen_notification(upnp_data) + + def stop_listen(self): + self.listening = False + if len(self.devices) == 0: + print "No InfiniTV found" + + +if __name__ == "__main__": + # Create a default UPnP socket + try: + interface_ip=sys.argv[1] + except: + print "detect_infinitv $ip_address" + sys.exit(1) + + try: + timeout = float(sys.argv[2]) + except: + timeout=5 + L = find_the_infiniTV(interface_ip=interface_ip) + thread.start_new_thread(L.listen, ()) + time.sleep(timeout) + L.stop_listen() + + + |