Integrating BGP Tools & Shodan’s Internet DB

Overview

This was a bit of experimenting using bgptools & shodan to try see if i could get a quick way of assessing a block of IP’s or an entire ASN for open ports & vulnerable services. Later variants of this use the authenticated shodan API, and integrate RBL checks. It’s crude code, with very little error handling. Figure someone might get a kick out of it ๐Ÿ™‚

Sample Output

Here’s a sample i took of a random internet IP that showed some interesting data. For legal reasons, i have commented out the IP.

{
"X.X.X.X": {
"cpes": [
"cpe:/a:openssl:openssl:1.1.1f",
"cpe:/a:openbsd:openssh"
],
"hostnames": [],
"ip": "X.X.X.X",
"ports": [
22,
27017
],
"tags": [
"compromised",
"database",
"eol-product"
],
"vulns": [
"CVE-2024-0727",
"CVE-2019-0190",
"CVE-2021-4160",
"CVE-2021-23841",
"CVE-2021-23840",
"CVE-2009-3767",
"CVE-2021-3449",
"CVE-2023-0215",
"CVE-2023-0464",
"CVE-2022-1292",
"CVE-2023-2650",
"CVE-2023-0286",
"CVE-2022-0778",
"CVE-2022-4450",
"CVE-2022-2097",
"CVE-2009-3765",
"CVE-2022-2068",
"CVE-2023-5678",
"CVE-2022-4304",
"CVE-2020-1967",
"CVE-2020-1971",
"CVE-2021-3711",
"CVE-2023-0466",
"CVE-2023-4807",
"CVE-2009-1390",
"CVE-2009-3766",
"CVE-2023-0465",
"CVE-2023-3817",
"CVE-2021-3712"
]
}
}

The SubnetIPScanner class performs the following tasks:

  1. Generates a list of IP addresses from a given subnet.
  2. Fetches and processes data for each IP from an external API.
  3. Downloads and updates BGP table data.
  4. Extracts BGP prefixes for a specific ASN (Autonomous System Number).
  5. Saves scan results to a JSON file.

Let’s dive into each part of the code to understand its functionality.

Class Initialization

class SubnetIPScanner:
def __init__(self, subnet=None):
self.subnet = subnet
self.api_url = "https://internetdb.shodan.io/"
self.results = {}
self.bgp_file = "bgp_table.jsonl"
self.bgp_url = "https://bgp.tools/table.jsonl"
self.user_agent = "Keith Rose - <email_address>"

The __init__ method sets up initial parameters for the scanner:

  • subnet: The subnet to scan, given as a CIDR notation.
  • api_url: The base URL for querying IP data.
  • results: A dictionary to store scan results.
  • bgp_file: The filename for storing the BGP table.
  • bgp_url: The URL to download the BGP table.
  • user_agent: Custom user-agent string for HTTP requests. This is required to ‘authenticate’ ourself to bgp.tools incase our script causes a problem on their platform.

IP List Generation

def get_ip_list(self):
try:
network = ipaddress.ip_network(self.subnet, strict=False)
return [str(ip) for ip in network.hosts()]
except ValueError as e:
print(f"Error: {e}")
return []

The get_ip_list method uses the ipaddress module to generate a list of IP addresses from the provided subnet. It handles exceptions gracefully, ensuring robustness in case of invalid subnet formats.

API Call for IP Data

def call_api_for_ip(self, ip):
try:
response = requests.get(f"{self.api_url}{ip}")
if response.status_code == 200:
return response.json()
else:
return None
except requests.RequestException as e:
print(f"Error: {e}")
return None

call_api_for_ip performs a GET request to the Shodan InternetDB API for a given IP address. It checks the response status and returns the JSON data if the request is successful.

Scanning Subnet

def scan_subnet(self):
ip_list = self.get_ip_list()
for ip in ip_list:
result = self.call_api_for_ip(ip)
if result is not None:
self.results[ip] = result

The scan_subnet method iterates over the list of IPs and collects data for each IP address, storing the results in the results dict.

Downloading and Parsing BGP Table

def download_bgp_table(self):
if not os.path.exists(self.bgp_file) or \
(time.time() - os.path.getmtime(self.bgp_file)) > 3600:
headers = {'User-Agent': self.user_agent}

try:
response = requests.get(self.bgp_url, headers=headers)
if response.status_code == 200:
with open(self.bgp_file, 'wb') as f:
f.write(response.content)
print(f"Downloaded {self.bgp_file}")
else:
print(f"Error: Received status code {response.status_code} for BGP table download")
except requests.RequestException as e:
print(f"Error: {e}")
else:
print(f"{self.bgp_file} is up to date")

download_bgp_table checks if the BGP table file needs updating (if itโ€™s more than an hour old) and downloads it if necessary. This ensures that the BGP data remains current while avoiding wasting bandwidth.

def get_prefixes_for_asn(self, asn):
prefixes = []
self.download_bgp_table()
try:
with open(self.bgp_file, 'r') as f:
for line in f:
entry = json.loads(line.strip())
if entry.get('ASN') == asn:
prefixes.append(entry.get('CIDR'))
except (IOError, json.JSONDecodeError) as e:
print(f"Error: {e}")
return prefixes

The get_prefixes_for_asn method extracts BGP prefixes associated with a specific ASN from the downloaded BGP table.

Saving Results to JSON

def save_results_to_json(self, filename):
try:
with open(filename, 'w') as json_file:
json.dump(self.results, json_file, indent=4)
print(f"Results saved to {filename}")
except IOError as e:
print(f"Error: {e}")

Finally, save_results_to_json saves the collected scan results to a JSON file for future reference.

Example Usage

if __name__ == "__main__":
subnet = "154.119.80.15/32" # Example subnet
asn = 207784 # Example ASN
scanner = SubnetIPScanner(subnet)
scanner.asn = asn
scanner.scan_subnet()
scanner.save_results_to_json("results.json")

In the example usage section, an instance of SubnetIPScanner is created, a subnet is scanned, and results are saved to a file. Additionally, the script shows how to fetch prefixes for a specific ASN (commented out).

Complete Script

import requests
import ipaddress
import json
import os
import time
from datetime import datetime

class SubnetIPScanner:
def __init__(self, subnet=None):
self.subnet = subnet
self.api_url = "https://internetdb.shodan.io/"
self.results = {}
self.bgp_file = "bgp_table.jsonl"
self.bgp_url = "https://bgp.tools/table.jsonl"
self.user_agent = "Keith Rose - me@keithro.se"

def get_ip_list(self):
try:
network = ipaddress.ip_network(self.subnet, strict=False)
return [str(ip) for ip in network.hosts()]
except ValueError as e:
print(f"Error: {e}")
return []

def call_api_for_ip(self, ip):
try:
response = requests.get(f"{self.api_url}{ip}")
if response.status_code == 200:
return response.json()
else:
#print(f"Error: Received status code {response.status_code} for IP {ip}")
return None
except requests.RequestException as e:
print(f"Error: {e}")
return None

def scan_subnet(self):
ip_list = self.get_ip_list()
for ip in ip_list:
result = self.call_api_for_ip(ip)
if result is not None:
self.results[ip] = result
print(result)

def scan_asn(self):
prefixes = self.get_prefixes_for_asn(self.asn)
res = {}
for prefix in prefixes:
self.subnet = prefix
res['prefix'] = self.scan_subnet()
return res


def download_bgp_table(self):
if not os.path.exists(self.bgp_file) or \
(time.time() - os.path.getmtime(self.bgp_file)) > 3600:
headers = {'User-Agent': self.user_agent}

try:
print('test')

response = requests.get(self.bgp_url, headers=headers)
if response.status_code == 200:
with open(self.bgp_file, 'wb') as f:
f.write(response.content)
print(f"Downloaded {self.bgp_file}")
else:
print(f"Error: Received status code {response.status_code} for BGP table download")
except requests.RequestException as e:
print(f"Error: {e}")
else:
print(f"{self.bgp_file} is up to date")

def get_prefixes_for_asn(self, asn):
prefixes = []
self.download_bgp_table()
try:
with open(self.bgp_file, 'r') as f:
for line in f:
entry = json.loads(line.strip())
if entry.get('ASN') == asn:
prefixes.append(entry.get('CIDR'))
except (IOError, json.JSONDecodeError) as e:
print(f"Error: {e}")
return prefixes

def save_results_to_json(self, filename):
try:
with open(filename, 'w') as json_file:
json.dump(self.results, json_file, indent=4)
print(f"Results saved to {filename}")
except IOError as e:
print(f"Error: {e}")

# Example usage:
if __name__ == "__main__":
subnet = "X.X.X.X/32" # Example subnet
asn = 15169 # Example ASN (Google) - Do NOT scan googles entire AS
scanner = SubnetIPScanner(subnet)
scanner.asn = asn
scanner.scan_subnet()
scanner.save_results_to_json("results.json")

# Fetching prefixes for a given ASN
#prefixes = scanner.get_prefixes_for_asn(asn)
#print(f"Prefixes for ASN {asn}: {prefixes}")
#print(scanner.scan_asn())

Leave a Reply

Your email address will not be published. Required fields are marked *