Overview
This was a bit of experimenting using bgptools & shodan to try see if i could get a quick way of assessing a block of IP’s or an entire ASN for open ports & vulnerable services. Later variants of this use the authenticated shodan API, and integrate RBL checks. It’s crude code, with very little error handling. Figure someone might get a kick out of it ๐
Sample Output
Here’s a sample i took of a random internet IP that showed some interesting data. For legal reasons, i have commented out the IP.
{
"X.X.X.X": {
"cpes": [
"cpe:/a:openssl:openssl:1.1.1f",
"cpe:/a:openbsd:openssh"
],
"hostnames": [],
"ip": "X.X.X.X",
"ports": [
22,
27017
],
"tags": [
"compromised",
"database",
"eol-product"
],
"vulns": [
"CVE-2024-0727",
"CVE-2019-0190",
"CVE-2021-4160",
"CVE-2021-23841",
"CVE-2021-23840",
"CVE-2009-3767",
"CVE-2021-3449",
"CVE-2023-0215",
"CVE-2023-0464",
"CVE-2022-1292",
"CVE-2023-2650",
"CVE-2023-0286",
"CVE-2022-0778",
"CVE-2022-4450",
"CVE-2022-2097",
"CVE-2009-3765",
"CVE-2022-2068",
"CVE-2023-5678",
"CVE-2022-4304",
"CVE-2020-1967",
"CVE-2020-1971",
"CVE-2021-3711",
"CVE-2023-0466",
"CVE-2023-4807",
"CVE-2009-1390",
"CVE-2009-3766",
"CVE-2023-0465",
"CVE-2023-3817",
"CVE-2021-3712"
]
}
}
The SubnetIPScanner
class performs the following tasks:
- Generates a list of IP addresses from a given subnet.
- Fetches and processes data for each IP from an external API.
- Downloads and updates BGP table data.
- Extracts BGP prefixes for a specific ASN (Autonomous System Number).
- Saves scan results to a JSON file.
Let’s dive into each part of the code to understand its functionality.
Class Initialization
class SubnetIPScanner:
def __init__(self, subnet=None):
self.subnet = subnet
self.api_url = "https://internetdb.shodan.io/"
self.results = {}
self.bgp_file = "bgp_table.jsonl"
self.bgp_url = "https://bgp.tools/table.jsonl"
self.user_agent = "Keith Rose - <email_address>"
The __init__
method sets up initial parameters for the scanner:
subnet
: The subnet to scan, given as a CIDR notation.api_url
: The base URL for querying IP data.results
: A dictionary to store scan results.bgp_file
: The filename for storing the BGP table.bgp_url
: The URL to download the BGP table.user_agent
: Custom user-agent string for HTTP requests. This is required to ‘authenticate’ ourself to bgp.tools incase our script causes a problem on their platform.
IP List Generation
def get_ip_list(self):
try:
network = ipaddress.ip_network(self.subnet, strict=False)
return [str(ip) for ip in network.hosts()]
except ValueError as e:
print(f"Error: {e}")
return []
The get_ip_list
method uses the ipaddress
module to generate a list of IP addresses from the provided subnet. It handles exceptions gracefully, ensuring robustness in case of invalid subnet formats.
API Call for IP Data
def call_api_for_ip(self, ip):
try:
response = requests.get(f"{self.api_url}{ip}")
if response.status_code == 200:
return response.json()
else:
return None
except requests.RequestException as e:
print(f"Error: {e}")
return None
call_api_for_ip
performs a GET request to the Shodan InternetDB API for a given IP address. It checks the response status and returns the JSON data if the request is successful.
Scanning Subnet
def scan_subnet(self):
ip_list = self.get_ip_list()
for ip in ip_list:
result = self.call_api_for_ip(ip)
if result is not None:
self.results[ip] = result
The scan_subnet
method iterates over the list of IPs and collects data for each IP address, storing the results in the results
dict.
Downloading and Parsing BGP Table
def download_bgp_table(self):
if not os.path.exists(self.bgp_file) or \
(time.time() - os.path.getmtime(self.bgp_file)) > 3600:
headers = {'User-Agent': self.user_agent}
try:
response = requests.get(self.bgp_url, headers=headers)
if response.status_code == 200:
with open(self.bgp_file, 'wb') as f:
f.write(response.content)
print(f"Downloaded {self.bgp_file}")
else:
print(f"Error: Received status code {response.status_code} for BGP table download")
except requests.RequestException as e:
print(f"Error: {e}")
else:
print(f"{self.bgp_file} is up to date")
download_bgp_table
checks if the BGP table file needs updating (if itโs more than an hour old) and downloads it if necessary. This ensures that the BGP data remains current while avoiding wasting bandwidth.
def get_prefixes_for_asn(self, asn):
prefixes = []
self.download_bgp_table()
try:
with open(self.bgp_file, 'r') as f:
for line in f:
entry = json.loads(line.strip())
if entry.get('ASN') == asn:
prefixes.append(entry.get('CIDR'))
except (IOError, json.JSONDecodeError) as e:
print(f"Error: {e}")
return prefixes
The get_prefixes_for_asn
method extracts BGP prefixes associated with a specific ASN from the downloaded BGP table.
Saving Results to JSON
def save_results_to_json(self, filename):
try:
with open(filename, 'w') as json_file:
json.dump(self.results, json_file, indent=4)
print(f"Results saved to {filename}")
except IOError as e:
print(f"Error: {e}")
Finally, save_results_to_json
saves the collected scan results to a JSON file for future reference.
Example Usage
if __name__ == "__main__":
subnet = "154.119.80.15/32" # Example subnet
asn = 207784 # Example ASN
scanner = SubnetIPScanner(subnet)
scanner.asn = asn
scanner.scan_subnet()
scanner.save_results_to_json("results.json")
In the example usage section, an instance of SubnetIPScanner
is created, a subnet is scanned, and results are saved to a file. Additionally, the script shows how to fetch prefixes for a specific ASN (commented out).
Complete Script
import requests
import ipaddress
import json
import os
import time
from datetime import datetime
class SubnetIPScanner:
def __init__(self, subnet=None):
self.subnet = subnet
self.api_url = "https://internetdb.shodan.io/"
self.results = {}
self.bgp_file = "bgp_table.jsonl"
self.bgp_url = "https://bgp.tools/table.jsonl"
self.user_agent = "Keith Rose - me@keithro.se"
def get_ip_list(self):
try:
network = ipaddress.ip_network(self.subnet, strict=False)
return [str(ip) for ip in network.hosts()]
except ValueError as e:
print(f"Error: {e}")
return []
def call_api_for_ip(self, ip):
try:
response = requests.get(f"{self.api_url}{ip}")
if response.status_code == 200:
return response.json()
else:
#print(f"Error: Received status code {response.status_code} for IP {ip}")
return None
except requests.RequestException as e:
print(f"Error: {e}")
return None
def scan_subnet(self):
ip_list = self.get_ip_list()
for ip in ip_list:
result = self.call_api_for_ip(ip)
if result is not None:
self.results[ip] = result
print(result)
def scan_asn(self):
prefixes = self.get_prefixes_for_asn(self.asn)
res = {}
for prefix in prefixes:
self.subnet = prefix
res['prefix'] = self.scan_subnet()
return res
def download_bgp_table(self):
if not os.path.exists(self.bgp_file) or \
(time.time() - os.path.getmtime(self.bgp_file)) > 3600:
headers = {'User-Agent': self.user_agent}
try:
print('test')
response = requests.get(self.bgp_url, headers=headers)
if response.status_code == 200:
with open(self.bgp_file, 'wb') as f:
f.write(response.content)
print(f"Downloaded {self.bgp_file}")
else:
print(f"Error: Received status code {response.status_code} for BGP table download")
except requests.RequestException as e:
print(f"Error: {e}")
else:
print(f"{self.bgp_file} is up to date")
def get_prefixes_for_asn(self, asn):
prefixes = []
self.download_bgp_table()
try:
with open(self.bgp_file, 'r') as f:
for line in f:
entry = json.loads(line.strip())
if entry.get('ASN') == asn:
prefixes.append(entry.get('CIDR'))
except (IOError, json.JSONDecodeError) as e:
print(f"Error: {e}")
return prefixes
def save_results_to_json(self, filename):
try:
with open(filename, 'w') as json_file:
json.dump(self.results, json_file, indent=4)
print(f"Results saved to {filename}")
except IOError as e:
print(f"Error: {e}")
# Example usage:
if __name__ == "__main__":
subnet = "X.X.X.X/32" # Example subnet
asn = 15169 # Example ASN (Google) - Do NOT scan googles entire AS
scanner = SubnetIPScanner(subnet)
scanner.asn = asn
scanner.scan_subnet()
scanner.save_results_to_json("results.json")
# Fetching prefixes for a given ASN
#prefixes = scanner.get_prefixes_for_asn(asn)
#print(f"Prefixes for ASN {asn}: {prefixes}")
#print(scanner.scan_asn())