frogress/consumer_scripts/aristophanes.py

312 lines
9.2 KiB
Python
Raw Normal View History

#! /usr/bin/env python3
import argparse
import csv
from io import FileIO, StringIO
import json
import requests
from typing import Iterable, List, Any, Dict
BASE_URL = "http://127.0.0.1:8000/data"
def make_slug_url(args: argparse.Namespace) -> str:
if BASE_URL:
url_components = [BASE_URL]
else:
url_components = [args.base_url]
for arg in [args.project, args.version, args.category]:
if arg != "":
url_components.append(arg)
return str.join("/", url_components)
def make_url_options(args: argparse.Namespace) -> str:
options: List[str] = []
if args.all:
options += "all=true"
ret = str.join("&", options)
if ret != "":
return "?" + ret
else:
return ""
def make_url(args: argparse.Namespace) -> str:
url = make_slug_url(args)
url += make_url_options(args)
return url
# def dict_filter(input: list, *keys) -> dict:
# output: list = []
# for d in input:
# output.append(dict((k, d[k]) for k in keys))
# return output
fields = [
"csv_version",
"timestamp",
"git_hash",
"code_matching",
"code_total",
"boot/matching",
"boot/total",
"code/matching",
"code/total",
"overlays/matching",
"overlays/total",
"asm",
"nonmatching_functions_count",
"assets_identified",
"assets_total",
"archives/identified",
"archives/total",
"audio/identified",
"audio/total",
"interface/identified",
"interface/total",
"misc/identified",
"misc/total",
"objects/identified",
"objects/total",
"scenes/identified",
"scenes/total",
"text/identified",
"text/total",
]
extra_fields = [
"csv_version",
"timestamp",
"git_hash",
"code_decompiled",
"code_total",
"boot/decompiled",
"boot/total",
"code/decompiled",
"code/total",
"overlays/decompiled",
"overlays/total",
"asm",
"nonmatching_functions_count",
"assets_debinarised",
"assets_total",
"archives/debinarised",
"archives/total",
"audio/debinarised",
"audio/total",
"interface/debinarised",
"interface/total",
"misc/debinarised",
"misc/total",
"objects/debinarised",
"objects/total",
"scenes/debinarised",
"scenes/total",
"text/debinarised",
"text/total",
]
categories = [
"default",
"boot",
"code",
"overlays",
"assets",
"archives",
"audio",
"interface",
"misc",
"objects",
"scenes",
"text",
]
def double_csv_to_json(
input: Iterable[str], extra_input: Iterable[str], output: List[Any]
) -> None:
csv_reader = csv.DictReader(input, fields)
extra_csv_reader = csv.DictReader(extra_input, extra_fields)
for row in csv_reader:
new_row: Dict[str, Any] = {}
for field in row:
measure = str.split(field, "/")
category = measure[0]
if category in categories:
if category not in new_row:
new_row[category] = {}
new_row[category][measure[1]] = int(row[field])
elif category in ["csv_version"]:
continue
elif category in ["git_hash"]:
new_row[category] = row.get(field)
elif category in ["timestamp"]:
new_row[category] = int(row[field])
else:
if "default" not in new_row:
new_row["default"] = {}
new_row["default"][category] = int(row[field])
if extra_input:
# For brevity this makes assumptions about the categories being present in the primary
extra_row: Dict[str, Any] = extra_csv_reader.__next__()
for field in extra_row:
measure = str.split(field, "/")
category = measure[0]
if category in categories:
if category not in new_row:
new_row[category] = {}
if measure[1] not in new_row[category]:
new_row[category][measure[1]] = int(extra_row[field])
elif category in ["csv_version", "timestamp", "git_hash"]:
continue
# new_row[category] = row.get(field)
else:
if "default" not in new_row:
new_row["default"] = {}
if category not in new_row["default"]:
new_row["default"][category] = int(extra_row[field])
output.append(new_row)
# filter_fields = [
# "timestamp",
# "git_hash",
# "matching_bytes",
# "total_bytes",
# ]
# output = dict_filter(output, filter_fields)
# print(output)
# def confuse_dicts(input, output: dict):
# mid = []
# for row in input:
# newRow = {}
# for category in categories:
# newRow[category] = {}
# for field in ["timestamp", "git_hash"]:
# newRow[category][field] = row.get(field)
# for field in row.get(category):
# newRow[category][field] = row.get(category).get(field)
# mid.append(newRow)
# for category in categories:
# output[category] = []
# for row in mid:
# output[category].append(row.get(category))
# def csv_to_category_json(args: argparse.Namespace, input):
# ...
test_csv_input = """
2,1615435438,e788bfecbfb10afd4182332db99bb562ea75b1de,103860,4747584,31572,86064,58624,1065936,13664,3595584,4597948,49,0,40816656,0,434608,0,6029280,0,801520,0,3900928,0,13518304,0,15695712,0,436304
2,1660614141,bb96e47f8df9fa42e2120b7acda90432bacfde39,3143604,4747584,78172,86064,507828,1065936,2557604,3595584,1613196,31,3592092,40816656,0,434608,0,6029280,80976,801520,328616,3900928,3146908,13518304,35592,15695712,0,436304
"""
test_extra_csv_input = """
2,1615435438,e788bfecbfb10afd4182332db99bb562ea75b1de,120152,4747584,36352,86064,70136,1065936,13664,3595584,4582152,0,0,40816656,0,434608,0,6029280,0,801520,0,3900928,0,13518304,0,15695712,0,436304
2,1660614141,bb96e47f8df9fa42e2120b7acda90432bacfde39,3177404,4747584,78544,86064,522780,1065936,2576080,3595584,1579396,0,29774016,40816656,0,434608,0,6029280,80976,801520,479024,3900928,13518304,13518304,15695712,15695712,0,436304
"""
def main() -> None:
description = "Talk to frogress."
epilog = ""
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
formatter_class=argparse.RawTextHelpFormatter,
)
# parser.add_argument("base_url", help="")
parser.add_argument("-p", "--project", help="", default="")
parser.add_argument("-v", "--version", help="", default="")
parser.add_argument("-c", "--category", help="", default="")
parser.add_argument(
"-a",
"--all",
help="fetch history instead of just most recent entry",
action="store_true",
)
args = parser.parse_args()
url = make_url(args)
url += "/"
url += "?format=json"
print(url)
# with requests.get(url) as r:
# print(r.status_code)
# if r.status_code == 200:
# result = r.json()
# try:
# string = json.dumps(result, sort_keys=True, indent=4)
# print(string)
# except:
# print("No idea what this package thought was wrong")
# result = subprocess.run(["curl", "-s", url], stdout=subprocess.PIPE)
# print(result.stdout)
# print(result)
# try:
# data = json.loads(result.stdout.decode())
# string = json.dumps(data, sort_keys=True, indent=4)
# print(string)
# except:
# print("Malformed JSON returned, is this URL not implemented yet?")
# with StringIO(test_csv_input) as f:
# output = []
# csv_to_json(f, output)
# # for entry in output:
# # string = json.dumps(entry, indent=4)
# # print(string)
# # confused_output = []
# # confuse_dicts(output, confused_output)
# # for entry in confused_output:
# # string = json.dumps(entry, indent=4)
# # print(string)
# confused_output = {}
# confuse_dicts(output, confused_output)
# string = json.dumps(confused_output, indent=4)
# print(string)
with StringIO(test_csv_input) as f:
with StringIO(test_extra_csv_input) as g:
output: List[Any] = []
double_csv_to_json(f, g, output)
request_body = {"api_key": "2", "data": output}
string = json.dumps(request_body, indent=4)
print(request_body)
# for entry in output:
# string = json.dumps(entry, indent=4)
# print(string)
# confused_output = []
# confuse_dicts(output, confused_output)
# for entry in confused_output:
# string = json.dumps(entry, indent=4)
# print(string)
# confused_output = {}
# confuse_dicts(output, confused_output)
# string = json.dumps(confused_output, indent=4)
# print(string)
with requests.post(url, json=request_body) as r:
print(r.status_code)
print(r.text)
if __name__ == "__main__":
main()