#!/usr/bin/env python2 # Program to detect Ceton InfiniTV tuners on the network # Program will only listen on a specific interface for a set amount of time. # If during that time it finds a device it will print out the location. # discover_infinitv $ip $timeout # ex: discover_infinitv 192.168.200.2 import socket import thread import time,sys class find_the_infiniTV(object): def __init__(self,UPNP_GROUP="239.255.255.250", UPNP_PORT=1900, interface_ip='127.0.0.1'): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(('', UPNP_PORT)) mreq = socket.inet_aton(UPNP_GROUP)+socket.inet_aton(interface_ip) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) self.sock=sock self.devices = {} self.emit_msearch() def emit_msearch(self): msearch_string = 'M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: "ssdp:discover"\r\nMX: 2\r\nST: urn:schemas-upnp-org:device:MediaRenderer:1\r\n\r\n' msearch_encoded_string = msearch_string.encode() self.sock.sendto( msearch_encoded_string, ('239.255.255.250', 1900 ) ) # Internally used method to read a notification def _listen_notification(self, data): upnp_header, upnp_body = data.split("\r\n\r\n") # Get header by line -- as fragments upnp_hfrags = upnp_header.split("\r\n") upnp_hfrags.pop(0) header = {} for frag in upnp_hfrags: splitpoint = frag.find(": ") header[frag[:splitpoint]] = frag[splitpoint+2:] if("USN" in header): if("CetonFactory" in header["USN"]): if(header["USN"] not in self.devices): self.devices[header["USN"]] = header["USN"] print("Found InfiniTV. Location URL: {0}".format(header["LOCATION"])) def listen(self): import select self.listening = True while self.listening: upnp_data = self.sock.recv(10240).decode('utf-8') #Filter by type (we only care about NOTIFY for now) if(upnp_data[0:6] == "NOTIFY"): self._listen_notification(upnp_data) def stop_listen(self): self.listening = False if len(self.devices) == 0: print "No InfiniTV found" if __name__ == "__main__": # Create a default UPnP socket try: interface_ip=sys.argv[1] except: print "detect_infinitv $ip_address" sys.exit(1) try: timeout = float(sys.argv[2]) except: timeout=15 L = find_the_infiniTV(interface_ip=interface_ip) thread.start_new_thread(L.listen, ()) time.sleep(timeout) L.stop_listen()