import re
import json
import pandas as pd
import argparse
import os
import gzip
import shutil
# import numpy as np
[docs]
parser = argparse.ArgumentParser(
description='Process a csv table into MDV static site format'
)
parser.add_argument('-i', '--input', help='csv file to process', default='data.csv')
parser.add_argument('-o', '--outdir', help='output folder')
parser.add_argument('--discard_redundant', help='discard redundant columns', default=False)
# grouping options? rename columns so numbers have leading zeros?
parser.add_argument('-g', '--group', help='group columns by regex', default=False)
parser.add_argument('--group_by', help='group columns by regex', default=r'(.*?)(\d+)(.*)')
parser.add_argument('-s', '--separator', help='multitext separator', default=';')
[docs]
parse_multitext = True
[docs]
args = parser.parse_args()
[docs]
separator = args.separator
filename = args.input
while not os.path.exists(filename):
input(f'file "{filename}" does not exist. Press enter to try again.')
[docs]
filename = input('CSV input file: ')
[docs]
basename = os.path.basename(filename)
outdir = args.outdir
if not outdir:
[docs]
outdir = input('output folder: ')
os.umask(0)
if not os.path.exists(outdir):
os.makedirs(outdir)
### todo: non-hacky image handling
[docs]
indir = os.path.dirname(filename)
[docs]
has_images = os.path.exists(os.path.join(indir, 'images'))
if has_images and not os.path.exists(os.path.join(outdir, 'images')):
try:
shutil.copytree(os.path.join(indir, 'images'), os.path.join(outdir, 'images'))
except:
pass
print('reading csv...')
[docs]
df = pd.read_csv(filename)
[docs]
types = {
'float64': 'double',
'int64': 'integer',
'O': 'text',
'object': 'text',
'bool': 'boolean', # 0 or 1? currently breaks this script if I make this integer
}
# if we were processing multiple sources, we should review global df / col_types...
[docs]
def rename_columns():
return
# rename columns that are numbers to have leading zeros
# this is so they sort correctly in the UI
for name in df.columns:
m = re.search(args.groub_by, name)
if not m:
continue
new_name = f'{m.group(1)}_{m.group(2).zfill(3)}{m.group(3)}'
df.rename(columns={name: new_name}, inplace=True)
[docs]
def get_column_type(name):
# get_column_type is called from get_datasource() then convert_data_to_binary()
# second call was getting wrong type, so remembering the values should help.
if name in col_types:
return col_types[name]
v = df[name]
unique_values = set(v)
dtype = str(v.dtype)
ttype = types[dtype]
# if dtype == 'text' and len(unique_values) == v.size:
# print(f'unique text column "{name}"')
# ttype = 'unique'
if ttype == 'text' and parse_multitext:
# does it look like comma-separated tags?
# 'argument of type 'bool' is not iterable'???
# when we have something like "unique_values: {False, True, nan}"
# print(f'{name}: ({type}) unique_values: {unique_values}')
n = len(unique_values)
if n > 65536:
print(f'detected unique column "{name}" (not well tested with this script)')
ttype = 'unique'
elif n > 256 or any([separator in str(s) for s in unique_values]):
print(f'detected multitext column "{name}"')
ttype = 'multitext'
if ttype is None:
raise ValueError(f'unknown type {v.dtype} for {name}')
col_types[name] = ttype
return ttype
[docs]
def get_quantiles(col):
qs = {}
for q in ["0.001", "0.01", "0.05"]:
q1 = col.quantile(float(q))
q2 = col.quantile(1-float(q))
qs[q] = [q1, q2]
return qs
[docs]
def get_text_indices(col):
values = list(set(col))
val_dict = {value: i for i, value in enumerate(values)}
return [val_dict[v] for v in col], [str(s) for s in values]
[docs]
def get_column_groups():
col_groups = {}
for name in df.columns:
m = re.search(args.group_by, name)
if not m:
continue
group_name = f'{m.group(1)}_{m.group(3)}'
if group_name not in col_groups:
col_groups[group_name] = {'name': group_name, 'columns': []}
# num = int(m.group(2))
col_groups[group_name]['columns'].append(name)
return [col_groups[k] for k in col_groups]
[docs]
def get_datasource():
'''
Has some side effects on the dataframe:
if args.discard_redundant:
- removes columns that are redundant (all the same value)
text columns are converted to indices.
Outputs a descriptor like this:
{
"name": "metric_table",
"size": number of rows,
"images": {
"images": {
"base_url": "./images/",
"type": "png",
"key_column": "image_id"
}
}
"columns": [
{
"datatype": "float" | "integer" | "text" | "unique",
"name": "column_name",
"field": "column_name",
"minMax"?: [min, max],
"quantiles"?: ...,
"values"?: ['a', 'b', 'c'],
}
]
}
'''
descriptor = {
"name": basename,
"size": df.shape[0],
"columns": []
}
if has_images:
# todo: make this able to take some config, set proper type / key_column etc.
# ideally, find a column that has values corresponding to the names of images in the folder...
descriptor['images'] = {
"images": {
"base_url": "./images/",
"type": "png",
"key_column": "Index"
}
}
for name in df.columns:
col = df[name]
if args.discard_redundant and len(set(col)) == 1:
df.drop(name, axis=1, inplace=True)
continue
datatype = get_column_type(name)
col_desc = { "datatype": datatype, "name": name, "field": name }
if datatype == 'boolean':
# would be better to have a separate boolean type
print(f'converting boolean {name} to number')
col_desc['datatype'] = 'integer'
col_desc['minMax'] = [0, 1]
elif datatype == 'double' or datatype == 'integer':
col_desc['minMax'] = [min(col), max(col)]
col_desc['quantiles'] = get_quantiles(col)
elif datatype == 'text' or datatype == 'multitext':
# would be better to have a separate boolean type
# col_desc['datatype'] = 'text'
indices, values = get_text_indices(col)
col_desc['values'] = values
if datatype == 'multitext':
col_desc['separator'] = separator
# mutating df here...
df[name] = indices
elif datatype == 'unique':
col_desc['stringLength'] = max([len(v) for v in col])
descriptor['columns'].append(col_desc)
descriptor['columnGroups'] = get_column_groups()
return descriptor
[docs]
def replace_text_values(col, values):
val_dict = {value: i for i, value in enumerate(values)}
return [val_dict[v] for v in col]
[docs]
def get_views():
return {basename: {"name": basename, 'initialCharts': {basename: []}}}
[docs]
def get_state():
return {"all_views": [basename], "initial_view": basename}
[docs]
def convert_data_to_binary(df):
'''
Converts the dataframe to binary format.
'''
dfile = f'{outdir}/{basename}.b'
o = open(dfile, 'wb')
index = {}
current_pos = 0
for name in df.columns:
# 'integer' and 'double' should be converted to float32 according to the spec
type = get_column_type(name)
if type == 'integer' or type == 'double' or type == 'boolean':
print(f'converting {name} {type} to float32')
df[name] = df[name].astype('float32')
if type == 'text':
print(f'converting {name} {type} to uint8')
df[name] = df[name].astype('uint8')
if type == 'multitext':
print(f'converting {name} {type} to uint16')
df[name] = df[name].astype('uint16')
comp = gzip.compress(df[name].to_numpy().tobytes())
new_pos = current_pos + len(comp)
index[name] = [current_pos, new_pos-1]
o.write(comp)
current_pos = new_pos
o.close()
ifile = dfile[:dfile.rindex('.')] + '.json'
with open(ifile, 'w') as f:
f.write(json.dumps(index))
[docs]
def main():
rename_columns()
if not os.path.exists(outdir):
print('creating output directory')
os.makedirs(outdir)
ds = get_datasource()
with open(f'{outdir}/datasources.json', 'w') as f:
print('writing datasources.json')
f.write(json.dumps([ds]))
print('writing data binary')
convert_data_to_binary(df)
with open(f'{outdir}/views.json', 'w') as f:
f.write(json.dumps(get_views()))
with open(f'{outdir}/state.json', 'w') as f:
f.write(json.dumps(get_state()))
if __name__ == '__main__':
main()