Integrating BGP Tools & Shodan’s Internet DB


This was a bit of experimenting using bgptools & shodan to try see if i could get a quick way of assessing a block of IP’s or an entire ASN for open ports & vulnerable services. Later variants of this use the authenticated shodan API, and integrate RBL checks. It’s crude code, with very little error handling. Figure someone might get a kick out of it ๐Ÿ™‚

Sample Output

Here’s a sample i took of a random internet IP that showed some interesting data. For legal reasons, i have commented out the IP.

"X.X.X.X": {
"cpes": [
"hostnames": [],
"ip": "X.X.X.X",
"ports": [
"tags": [
"vulns": [

The SubnetIPScanner class performs the following tasks:

  1. Generates a list of IP addresses from a given subnet.
  2. Fetches and processes data for each IP from an external API.
  3. Downloads and updates BGP table data.
  4. Extracts BGP prefixes for a specific ASN (Autonomous System Number).
  5. Saves scan results to a JSON file.

Let’s dive into each part of the code to understand its functionality.

Class Initialization

class SubnetIPScanner:
def __init__(self, subnet=None):
self.subnet = subnet
self.api_url = ""
self.results = {}
self.bgp_file = "bgp_table.jsonl"
self.bgp_url = ""
self.user_agent = "Keith Rose - <email_address>"

The __init__ method sets up initial parameters for the scanner:

  • subnet: The subnet to scan, given as a CIDR notation.
  • api_url: The base URL for querying IP data.
  • results: A dictionary to store scan results.
  • bgp_file: The filename for storing the BGP table.
  • bgp_url: The URL to download the BGP table.
  • user_agent: Custom user-agent string for HTTP requests. This is required to ‘authenticate’ ourself to incase our script causes a problem on their platform.

IP List Generation

def get_ip_list(self):
network = ipaddress.ip_network(self.subnet, strict=False)
return [str(ip) for ip in network.hosts()]
except ValueError as e:
print(f"Error: {e}")
return []

The get_ip_list method uses the ipaddress module to generate a list of IP addresses from the provided subnet. It handles exceptions gracefully, ensuring robustness in case of invalid subnet formats.

API Call for IP Data

def call_api_for_ip(self, ip):
response = requests.get(f"{self.api_url}{ip}")
if response.status_code == 200:
return response.json()
return None
except requests.RequestException as e:
print(f"Error: {e}")
return None

call_api_for_ip performs a GET request to the Shodan InternetDB API for a given IP address. It checks the response status and returns the JSON data if the request is successful.

Scanning Subnet

def scan_subnet(self):
ip_list = self.get_ip_list()
for ip in ip_list:
result = self.call_api_for_ip(ip)
if result is not None:
self.results[ip] = result

The scan_subnet method iterates over the list of IPs and collects data for each IP address, storing the results in the results dict.

Downloading and Parsing BGP Table

def download_bgp_table(self):
if not os.path.exists(self.bgp_file) or \
(time.time() - os.path.getmtime(self.bgp_file)) > 3600:
headers = {'User-Agent': self.user_agent}

response = requests.get(self.bgp_url, headers=headers)
if response.status_code == 200:
with open(self.bgp_file, 'wb') as f:
print(f"Downloaded {self.bgp_file}")
print(f"Error: Received status code {response.status_code} for BGP table download")
except requests.RequestException as e:
print(f"Error: {e}")
print(f"{self.bgp_file} is up to date")

download_bgp_table checks if the BGP table file needs updating (if itโ€™s more than an hour old) and downloads it if necessary. This ensures that the BGP data remains current while avoiding wasting bandwidth.

def get_prefixes_for_asn(self, asn):
prefixes = []
with open(self.bgp_file, 'r') as f:
for line in f:
entry = json.loads(line.strip())
if entry.get('ASN') == asn:
except (IOError, json.JSONDecodeError) as e:
print(f"Error: {e}")
return prefixes

The get_prefixes_for_asn method extracts BGP prefixes associated with a specific ASN from the downloaded BGP table.

Saving Results to JSON

def save_results_to_json(self, filename):
with open(filename, 'w') as json_file:
json.dump(self.results, json_file, indent=4)
print(f"Results saved to {filename}")
except IOError as e:
print(f"Error: {e}")

Finally, save_results_to_json saves the collected scan results to a JSON file for future reference.

Example Usage

if __name__ == "__main__":
subnet = "" # Example subnet
asn = 207784 # Example ASN
scanner = SubnetIPScanner(subnet)
scanner.asn = asn

In the example usage section, an instance of SubnetIPScanner is created, a subnet is scanned, and results are saved to a file. Additionally, the script shows how to fetch prefixes for a specific ASN (commented out).

Complete Script

import requests
import ipaddress
import json
import os
import time
from datetime import datetime

class SubnetIPScanner:
def __init__(self, subnet=None):
self.subnet = subnet
self.api_url = ""
self.results = {}
self.bgp_file = "bgp_table.jsonl"
self.bgp_url = ""
self.user_agent = "Keith Rose -"

def get_ip_list(self):
network = ipaddress.ip_network(self.subnet, strict=False)
return [str(ip) for ip in network.hosts()]
except ValueError as e:
print(f"Error: {e}")
return []

def call_api_for_ip(self, ip):
response = requests.get(f"{self.api_url}{ip}")
if response.status_code == 200:
return response.json()
#print(f"Error: Received status code {response.status_code} for IP {ip}")
return None
except requests.RequestException as e:
print(f"Error: {e}")
return None

def scan_subnet(self):
ip_list = self.get_ip_list()
for ip in ip_list:
result = self.call_api_for_ip(ip)
if result is not None:
self.results[ip] = result

def scan_asn(self):
prefixes = self.get_prefixes_for_asn(self.asn)
res = {}
for prefix in prefixes:
self.subnet = prefix
res['prefix'] = self.scan_subnet()
return res

def download_bgp_table(self):
if not os.path.exists(self.bgp_file) or \
(time.time() - os.path.getmtime(self.bgp_file)) > 3600:
headers = {'User-Agent': self.user_agent}


response = requests.get(self.bgp_url, headers=headers)
if response.status_code == 200:
with open(self.bgp_file, 'wb') as f:
print(f"Downloaded {self.bgp_file}")
print(f"Error: Received status code {response.status_code} for BGP table download")
except requests.RequestException as e:
print(f"Error: {e}")
print(f"{self.bgp_file} is up to date")

def get_prefixes_for_asn(self, asn):
prefixes = []
with open(self.bgp_file, 'r') as f:
for line in f:
entry = json.loads(line.strip())
if entry.get('ASN') == asn:
except (IOError, json.JSONDecodeError) as e:
print(f"Error: {e}")
return prefixes

def save_results_to_json(self, filename):
with open(filename, 'w') as json_file:
json.dump(self.results, json_file, indent=4)
print(f"Results saved to {filename}")
except IOError as e:
print(f"Error: {e}")

# Example usage:
if __name__ == "__main__":
subnet = "X.X.X.X/32" # Example subnet
asn = 15169 # Example ASN (Google) - Do NOT scan googles entire AS
scanner = SubnetIPScanner(subnet)
scanner.asn = asn

# Fetching prefixes for a given ASN
#prefixes = scanner.get_prefixes_for_asn(asn)
#print(f"Prefixes for ASN {asn}: {prefixes}")

Leave a Reply

Your email address will not be published. Required fields are marked *