Source code for mdvtools.csv_to_static

import re
import json
import pandas as pd
import argparse
import os
import gzip
import shutil
# import numpy as np

[docs] parser = argparse.ArgumentParser( description='Process a csv table into MDV static site format' )
parser.add_argument('-i', '--input', help='csv file to process', default='data.csv') parser.add_argument('-o', '--outdir', help='output folder') parser.add_argument('--discard_redundant', help='discard redundant columns', default=False) # grouping options? rename columns so numbers have leading zeros? parser.add_argument('-g', '--group', help='group columns by regex', default=False) parser.add_argument('--group_by', help='group columns by regex', default=r'(.*?)(\d+)(.*)') parser.add_argument('-s', '--separator', help='multitext separator', default=';')
[docs] parse_multitext = True
[docs] args = parser.parse_args()
[docs] separator = args.separator
filename = args.input while not os.path.exists(filename): input(f'file "{filename}" does not exist. Press enter to try again.')
[docs] filename = input('CSV input file: ')
[docs] basename = os.path.basename(filename)
outdir = args.outdir if not outdir:
[docs] outdir = input('output folder: ')
os.umask(0) if not os.path.exists(outdir): os.makedirs(outdir) ### todo: non-hacky image handling
[docs] indir = os.path.dirname(filename)
[docs] has_images = os.path.exists(os.path.join(indir, 'images'))
if has_images and not os.path.exists(os.path.join(outdir, 'images')): try: shutil.copytree(os.path.join(indir, 'images'), os.path.join(outdir, 'images')) except: pass print('reading csv...')
[docs] df = pd.read_csv(filename)
[docs] types = { 'float64': 'double', 'int64': 'integer', 'O': 'text', 'object': 'text', 'bool': 'boolean', # 0 or 1? currently breaks this script if I make this integer }
# if we were processing multiple sources, we should review global df / col_types...
[docs] col_types = {}
[docs] def rename_columns(): return # rename columns that are numbers to have leading zeros # this is so they sort correctly in the UI for name in df.columns: m = re.search(args.groub_by, name) if not m: continue new_name = f'{m.group(1)}_{m.group(2).zfill(3)}{m.group(3)}' df.rename(columns={name: new_name}, inplace=True)
[docs] def get_column_type(name): # get_column_type is called from get_datasource() then convert_data_to_binary() # second call was getting wrong type, so remembering the values should help. if name in col_types: return col_types[name] v = df[name] unique_values = set(v) dtype = str(v.dtype) ttype = types[dtype] # if dtype == 'text' and len(unique_values) == v.size: # print(f'unique text column "{name}"') # ttype = 'unique' if ttype == 'text' and parse_multitext: # does it look like comma-separated tags? # 'argument of type 'bool' is not iterable'??? # when we have something like "unique_values: {False, True, nan}" # print(f'{name}: ({type}) unique_values: {unique_values}') n = len(unique_values) if n > 65536: print(f'detected unique column "{name}" (not well tested with this script)') ttype = 'unique' elif n > 256 or any([separator in str(s) for s in unique_values]): print(f'detected multitext column "{name}"') ttype = 'multitext' if ttype is None: raise ValueError(f'unknown type {v.dtype} for {name}') col_types[name] = ttype return ttype
[docs] def get_quantiles(col): qs = {} for q in ["0.001", "0.01", "0.05"]: q1 = col.quantile(float(q)) q2 = col.quantile(1-float(q)) qs[q] = [q1, q2] return qs
[docs] def get_text_indices(col): values = list(set(col)) val_dict = {value: i for i, value in enumerate(values)} return [val_dict[v] for v in col], [str(s) for s in values]
[docs] def get_column_groups(): col_groups = {} for name in df.columns: m = re.search(args.group_by, name) if not m: continue group_name = f'{m.group(1)}_{m.group(3)}' if group_name not in col_groups: col_groups[group_name] = {'name': group_name, 'columns': []} # num = int(m.group(2)) col_groups[group_name]['columns'].append(name) return [col_groups[k] for k in col_groups]
[docs] def get_datasource(): ''' Has some side effects on the dataframe: if args.discard_redundant: - removes columns that are redundant (all the same value) text columns are converted to indices. Outputs a descriptor like this: { "name": "metric_table", "size": number of rows, "images": { "images": { "base_url": "./images/", "type": "png", "key_column": "image_id" } } "columns": [ { "datatype": "float" | "integer" | "text" | "unique", "name": "column_name", "field": "column_name", "minMax"?: [min, max], "quantiles"?: ..., "values"?: ['a', 'b', 'c'], } ] } ''' descriptor = { "name": basename, "size": df.shape[0], "columns": [] } if has_images: # todo: make this able to take some config, set proper type / key_column etc. # ideally, find a column that has values corresponding to the names of images in the folder... descriptor['images'] = { "images": { "base_url": "./images/", "type": "png", "key_column": "Index" } } for name in df.columns: col = df[name] if args.discard_redundant and len(set(col)) == 1: df.drop(name, axis=1, inplace=True) continue datatype = get_column_type(name) col_desc = { "datatype": datatype, "name": name, "field": name } if datatype == 'boolean': # would be better to have a separate boolean type print(f'converting boolean {name} to number') col_desc['datatype'] = 'integer' col_desc['minMax'] = [0, 1] elif datatype == 'double' or datatype == 'integer': col_desc['minMax'] = [min(col), max(col)] col_desc['quantiles'] = get_quantiles(col) elif datatype == 'text' or datatype == 'multitext': # would be better to have a separate boolean type # col_desc['datatype'] = 'text' indices, values = get_text_indices(col) col_desc['values'] = values if datatype == 'multitext': col_desc['separator'] = separator # mutating df here... df[name] = indices elif datatype == 'unique': col_desc['stringLength'] = max([len(v) for v in col]) descriptor['columns'].append(col_desc) descriptor['columnGroups'] = get_column_groups() return descriptor
[docs] def replace_text_values(col, values): val_dict = {value: i for i, value in enumerate(values)} return [val_dict[v] for v in col]
[docs] def get_views(): return {basename: {"name": basename, 'initialCharts': {basename: []}}}
[docs] def get_state(): return {"all_views": [basename], "initial_view": basename}
[docs] def convert_data_to_binary(df): ''' Converts the dataframe to binary format. ''' dfile = f'{outdir}/{basename}.b' o = open(dfile, 'wb') index = {} current_pos = 0 for name in df.columns: # 'integer' and 'double' should be converted to float32 according to the spec type = get_column_type(name) if type == 'integer' or type == 'double' or type == 'boolean': print(f'converting {name} {type} to float32') df[name] = df[name].astype('float32') if type == 'text': print(f'converting {name} {type} to uint8') df[name] = df[name].astype('uint8') if type == 'multitext': print(f'converting {name} {type} to uint16') df[name] = df[name].astype('uint16') comp = gzip.compress(df[name].to_numpy().tobytes()) new_pos = current_pos + len(comp) index[name] = [current_pos, new_pos-1] o.write(comp) current_pos = new_pos o.close() ifile = dfile[:dfile.rindex('.')] + '.json' with open(ifile, 'w') as f: f.write(json.dumps(index))
[docs] def main(): rename_columns() if not os.path.exists(outdir): print('creating output directory') os.makedirs(outdir) ds = get_datasource() with open(f'{outdir}/datasources.json', 'w') as f: print('writing datasources.json') f.write(json.dumps([ds])) print('writing data binary') convert_data_to_binary(df) with open(f'{outdir}/views.json', 'w') as f: f.write(json.dumps(get_views())) with open(f'{outdir}/state.json', 'w') as f: f.write(json.dumps(get_state()))
if __name__ == '__main__': main()