Misc Tools
The following script can be used to compare the geolocation reported by
the probes submitting measurements compared to the geolocation of the
subnet the probe is coming from. It is meant to be run on
backend-fsn.ooni.org 🖥.
##!/usr/bin/env python3
from time import sleep
import systemd.journalimport geoip2.database # type: ignore
asnfn = "/var/lib/ooniapi/asn.mmdb"ccfn = "/var/lib/ooniapi/cc.mmdb"geoip_asn_reader = geoip2.database.Reader(asnfn)geoip_cc_reader = geoip2.database.Reader(ccfn)
def follow_journal(): journal = systemd.journal.Reader() #journal.seek_tail() journal.get_previous() journal.add_match(_SYSTEMD_UNIT="nginx.service") while True: try: event = journal.wait(-1) if event == systemd.journal.APPEND: for entry in journal: yield entry["MESSAGE"] except Exception as e: print(e) sleep(0.1)
def geolookup(ipaddr: str): cc = geoip_cc_reader.country(ipaddr).country.iso_code asn = geoip_asn_reader.asn(ipaddr).autonomous_system_number return cc, asn
def process(rawmsg): if ' "POST /report/' not in rawmsg: return msg = rawmsg.strip().split() ipaddr = msg[2] ipaddr2 = msg[3] path = msg[8][8:] tsamp, tn, probe_cc, probe_asn, collector, rand = path.split("_") geo_cc, geo_asn = geolookup(ipaddr) proxied = 0 probe_type = rawmsg.rsplit('"', 2)[-2] if "," in probe_type: return if ipaddr2 != "": proxied = 1 # Probably CloudFront, use second ipaddr geo_cc, geo_asn = geolookup(ipaddr2)
def main(): for msg in follow_journal(): if msg is None: break try: process(msg) except Exception as e: print(e) sleep(0.1)
if __name__ == "__main__": main()
Test list prioritization monitoring
The following script monitors prioritized test list for changes in URLs for a set of countries. Outputs StatsS metrics.
note The prioritization system has been modified to work on a granularity of probe_cc + probe_asn rather than whole countries.
Country-wise changes might be misleading. The script can be modified to filter for a set of CCs+ASNs.
##!/usr/bin/env python3
from time import sleepimport urllib.requestimport json
import statsd # debdeps: python3-statsd
metrics = statsd.StatsClient("", 8125, prefix="test-list-changes")
CCs = ["GE", "IT", "US"]THRESH = 100
def peek(cc, listmap) -> None: url = f"https://api.ooni.io/api/v1/test-list/urls?country_code={cc}&debug=True" res = urllib.request.urlopen(url) j = json.load(res) top = j["results"][:THRESH] # list of dicts top_urls = set(d["url"] for d in top)
if cc in listmap: old = listmap[cc] changed = old.symmetric_difference(top_urls) tot_cnt = len(old.union(top_urls)) changed_ratio = len(changed) / tot_cnt * 100 metrics.gauge(f"-{cc}", changed_ratio)
listmap[cc] = top_urls
def main() -> None: listmap = {} while True: for cc in CCs: try: peek(cc, listmap) except Exception as e: print(e) sleep(1) sleep(60 * 10)
if __name__ == "__main__": main()
Recompressing postcans on S3
The following script can be used to compress .tar.gz files in the S3 data bucket. It keeps a copy of the original files locally as a backup. It terminates once a correctly compressed file is found. Running the script on an AWS host close to the S3 bucket can significantly speed up the process.
Tested with the packages:
- python3-boto3 1.28.49+dfsg-1
- python3-magic 2:0.4.27-2
Set the ACCESS_KEY and SECRET_KEY environment variables. Update the PREFIX variable as needed.
##!/usr/bin/env python3from os import getenv, renamefrom sys import exitimport boto3import gzipimport magic
BUCKET_NAME = "ooni-data-eu-fra-test"## BUCKET_NAME = "ooni-data-eu-fra"PREFIX = "raw/2021"
def fetch_files(): s3 = boto3.client( "s3", aws_access_key_id=getenv("ACCESS_KEY"), aws_secret_access_key=getenv("SECRET_KEY"), ) cont_token = None while True: kw = {} if cont_token is None else dict(ContinuationToken=cont_token) r = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=PREFIX, **kw) cont_token = r.get("NextContinuationToken", None) for i in r.get("Contents", []): k = i["Key"] if k.endswith(".tar.gz"): fn = k.rsplit("/", 1)[-1] s3.download_file(BUCKET_NAME, k, fn) yield k, fn if cont_token is None: return
def main(): s3res = session = boto3.Session( aws_access_key_id=getenv("ACCESS_KEY"), aws_secret_access_key=getenv("SECRET_KEY"), ).resource("s3") for s3key, fn in fetch_files(): ft = magic.from_file(fn) if "tar archive" not in ft: print(f"found {ft} at {s3key}") # continue # simply ignore already compressed files exit() # stop when compressed files are found tarfn = fn[:-3] rename(fn, tarfn) # keep the local file as a backup with open(tarfn, "rb") as f: inp = f.read() comp = gzip.compress(inp, compresslevel=9) ratio = len(inp) / len(comp) del inp print(f"uploading {s3key} compression ratio {ratio}") obj = s3res.Object(BUCKET_NAME, s3key) obj.put(Body=comp) del comp