from scapy.all import sniff, ARP, DHCP, DNS, DNSQR, UDP, IP, Ether, BOOTP import urllib.request import threading import sqlite3 import datetime import time devices = {} mac_vendors = {} api_lock = threading.Lock() # ----- DATABASE FUNCTIONS ----- def init_db(): # Initialize SQLite database and create tables for devices, DNS queries, and services conn = sqlite3.connect('scanner.db') c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS devices (mac TEXT PRIMARY KEY, ip TEXT, vendor TEXT, hostname TEXT, last_seen TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS dns_queries (ip TEXT, domain TEXT, timestamp TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS services (ip TEXT, service_name TEXT, timestamp TEXT)''') conn.commit() conn.close() def log_device(mac, ip, vendor, hostname): # Insert or update device information in the database conn = sqlite3.connect('scanner.db') c = conn.cursor() now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") c.execute('''INSERT INTO devices (mac, ip, vendor, hostname, last_seen) VALUES (?, ?, ?, ?, ?) ON CONFLICT(mac) DO UPDATE SET ip=excluded.ip, vendor=excluded.vendor, hostname=excluded.hostname, last_seen=excluded.last_seen''', (mac, ip, vendor, hostname, now)) conn.commit() conn.close() def log_dns(ip, domain): # Log DNS query from device to database conn = sqlite3.connect('scanner.db') c = conn.cursor() now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") c.execute("INSERT INTO dns_queries (ip, domain, timestamp) VALUES (?, ?, ?)", (ip, domain, now)) conn.commit() conn.close() def log_service(ip, service_name): # Log discovered service (mDNS) to database conn = sqlite3.connect('scanner.db') c = conn.cursor() now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") c.execute("INSERT INTO services (ip, service_name, timestamp) VALUES (?, ?, ?)", (ip, service_name, now)) conn.commit() conn.close() # ----- NETWORK LISTENING FUNCTIONS ----- def is_random_mac(mac): # Check if MAC is locally administered (random/spoofed) # Get first byte and convert to integer first_byte = int(mac[:2], 16) # Check if second bit (Locally Administered bit) is 1 return (first_byte & 2) == 2 def get_mac_vendor(mac, ip): # Fetch vendor name from API using MAC prefix with thread-safe locking prefix = mac[:8].upper() if is_random_mac(mac): vendor = "Random/Hidden MAC" mac_vendors[prefix] = vendor elif prefix not in mac_vendors: # LOCK START: Only 1 thread can query API at a time, others wait with api_lock: try: # If another thread already resolved this vendor while waiting, skip the API call if prefix in mac_vendors: vendor = mac_vendors[prefix] else: url = f"https://api.macvendors.com/{mac}" req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) response = urllib.request.urlopen(req, timeout=3) vendor = response.read().decode('utf-8') mac_vendors[prefix] = vendor time.sleep(1.2) # Sleep to respect API rate limit except urllib.error.HTTPError as e: if e.code == 429: vendor = "API Limit" else: vendor = "Unknown" mac_vendors[prefix] = vendor except Exception: vendor = "Unknown" mac_vendors[prefix] = vendor # LOCK END else: vendor = mac_vendors[prefix] print(f"[+] New Device: {ip} ({mac}) -> Vendor: {vendor}") # FIX: Guard against race condition — devices[mac] may not exist if called unexpectedly device = devices.get(mac) hostname = device.get('hostname', '') if device else '' log_device(mac, ip, vendor, hostname) def process_packet(packet): # Parse network packets and extract device info, DNS queries, and services if packet.haslayer(Ether) and packet.haslayer(IP): mac = packet[Ether].src ip = packet[IP].src # If this MAC hasn't been recorded before, capture it immediately if mac not in devices: devices[mac] = {'ip': ip, 'hostname': ''} threading.Thread(target=get_mac_vendor, args=(mac, ip)).start() # 1. ARP Detection (ARP packets don't contain IP layer, so we keep this separately) if packet.haslayer(ARP) and packet[ARP].op == 1: mac = packet[ARP].hwsrc ip = packet[ARP].psrc if mac not in devices: devices[mac] = {'ip': ip, 'hostname': ''} threading.Thread(target=get_mac_vendor, args=(mac, ip)).start() # 2. DHCP Detection (to extract hostname) if packet.haslayer(DHCP) and packet.haslayer(BOOTP): # op == 1 means: "This is a device IP request (Discover/Request)", not a server response if packet[BOOTP].op == 1: mac = packet[Ether].src for opt in packet[DHCP].options: if isinstance(opt, tuple) and opt[0] == 'hostname': hostname = opt[1].decode('utf-8') if mac in devices: devices[mac]['hostname'] = hostname # FIX: Use consistent "Unknown" fallback (was "Bilinmiyor" in Turkish) log_device(mac, devices[mac]['ip'], mac_vendors.get(mac[:8].upper(), "Unknown"), hostname) print(f"[*] Device Name: {mac} -> {hostname}") # 3. DNS Detection (Internet sites) if packet.haslayer(DNSQR) and packet.haslayer(IP): ip_src = packet[IP].src query = packet[DNSQR].qname.decode('utf-8') if not query.endswith('.local.'): print(f"[>] {ip_src} -> {query}") log_dns(ip_src, query) # 4. mDNS Detection (smart device services) if packet.haslayer(UDP) and packet[UDP].dport == 5353 and packet.haslayer(DNS) and packet.haslayer(IP): ip_src = packet[IP].src if packet[DNS].qd: qname = packet[DNS].qd.qname.decode('utf-8') if "_tcp" in qname or "_udp" in qname or "apple" in qname: print(f"[~] Service: {ip_src} -> {qname}") log_service(ip_src, qname) def start_sniffer(): # Initialize database and begin packet sniffing on the network print("Initializing database...") init_db() print("Advanced Listening Active (Any packet will be immediately added to MAC device list)...") sniff(prn=process_packet, filter="arp or (udp port 53) or (udp port 5353) or (udp port 67) or (udp port 68)", store=0) if __name__ == "__main__": start_sniffer()