Python3 LAN host detection using Scapy

This simple setup helps you keep track of devices on your network by running a background script that scans for new hosts every 30 seconds. If a new device shows up, it gets logged with its details—like MAC address, IP address, and the interface where it was detected—into a PostgreSQL database.

By setting it up as a systemd service, the script runs automatically in the background, so you don’t have to worry about it once it’s up and running. It’s a straightforward way to stay informed & keep track of devices attached to your local network.

Install Python Requirements
pip install python-daemon psycopg2-binary scapy
Setup the Postgres Table with the below
CREATE TABLE lan_hosts (
mac_address VARCHAR(17) PRIMARY KEY,
ip_address VARCHAR(15),
first_detected TIMESTAMP,
last_seen TIMESTAMP
);
Create the script /opt/lan_host_detector.py
import os
import time
import psycopg2
from scapy.all import ARP, Ether, srp
from datetime import datetime
import daemon

# PostgreSQL connection parameters from environment variables
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_NAME = os.getenv("DB_NAME", "your_db_name")
DB_USER = os.getenv("DB_USER", "your_db_user")
DB_PASS = os.getenv("DB_PASS", "your_db_password")

# List of LAN interfaces and network to scan
INTERFACES = ["interface1", "interface2"] # Replace with your interfaces
NETWORK = "192.168.1.0/24" # Replace with your network

def get_db_connection():
return psycopg2.connect(
host=DB_HOST,
database=DB_NAME,
user=DB_USER,
password=DB_PASS
)

def perform_arpscan(interface):
arp_request = ARP(pdst=NETWORK)
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast / arp_request
answered_list = srp(arp_request_broadcast, timeout=2, iface=interface, verbose=False)[0]

hosts = []
for element in answered_list:
host = {
"ip": element[1].psrc,
"mac": element[1].hwsrc,
"interface": interface
}
hosts.append(host)

return hosts

def store_or_update_host(mac_address, ip_address, interface):
conn = get_db_connection()
cursor = conn.cursor()

cursor.execute("SELECT mac_address FROM lan_hosts WHERE mac_address = %s AND interface = %s", (mac_address, interface))
result = cursor.fetchone()

now = datetime.now()

if result:
# Update the last seen timestamp for known hosts
cursor.execute(
"UPDATE lan_hosts SET last_seen = %s, ip_address = %s WHERE mac_address = %s AND interface = %s",
(now, ip_address, mac_address, interface)
)
else:
# Insert new host into the db
cursor.execute(
"INSERT INTO lan_hosts (mac_address, ip_address, interface, first_detected, last_seen) VALUES (%s, %s, %s, %s, %s)",
(mac_address, ip_address, interface, now, now)
)
print(f"New host detected: {mac_address} ({ip_address}) on interface {interface}")

conn.commit()
cursor.close()
conn.close()

def run():
while True:
for interface in INTERFACES:
hosts = perform_arpscan(interface)

for host in hosts:
store_or_update_host(host["mac"], host["ip"], host["interface"])

# Sleep for 30 secs
time.sleep(30)

if __name__ == "__main__":
with daemon.DaemonContext():
run()
Setup the systemd service
echo "[Unit]
Description=LAN Host Detector

[Service]
ExecStart=/usr/bin/python3 /opt/lan_host_detector.py
Restart=always

[Install]
WantedBy=multi-user.target" | sudo tee /etc/systemd/system/lan_host_detector.service > /dev/null
Install & Start the service
sudo systemctl daemon-reload
sudo systemctl start lan_host_detector.service
sudo systemctl enable lan_host_detector.service

Leave a Reply

Your email address will not be published. Required fields are marked *