diff --git a/examples/002_example/001_make_folders.py b/examples/002_example/001_make_folders.py new file mode 100644 index 0000000000000000000000000000000000000000..adfe8cad3164ceb28e161b4bdc229262a92963f8 --- /dev/null +++ b/examples/002_example/001_make_folders.py @@ -0,0 +1,106 @@ +# %% +import tree_maker +from tree_maker import NodeJob +import time +# %% +# Clearly for this easy task on can do all in the very same python kernel +# BUT here we want to mimic the typical flow +# 1. MADX for optics matching/error seeding +# 2. Tracking for FMA and or DA studies +# 3. simulation baby-sitting and +# 4. postprocessing + +import numpy as np +a=np.random.randn(50) +b=np.random.randn(50) +c=np.random.randn(10) + +my_list_original=[] +for ii in c: + my_list_original+=list(np.sqrt(np.abs((a+b)*ii))) +my_list_original=sorted(my_list_original) + +# %% +""" +#### The root of the tree +""" +start_time = time.time() +# %% +#root +import os +my_folder = os.getcwd() +root = NodeJob(name='root', parent=None) +root.path = my_folder + '/study_000' +root.template_path = my_folder + '/templates' +root.log_file = root.path + "/log.json" + +# %% +""" +#### First generation of nodes +""" + +# %% +#first generation +for node in root.root.generation(0): + node.children=[NodeJob(name=f"{child:03}", + parent=node, + path=f"{node.path}/{child:03}", + template_path = root.template_path+'/sum_it', + #submit_command = f'python {root.template_path}/sum_it/run.py &', + submit_command = f'bsub -q hpc_acc -e %J.err -o %J.out cd {node.path}/{child:03} && {root.template_path}/sum_it/run.sh &', + log_file=f"{node.path}/{child:03}/log.json", + dictionary={'a':float(a[child]), + 'b':float(b[child]) + }) + for child in range(len(a))] + +# To combine different lists one can use the product or the zip functions +#import itertools +#[[i, j, z] for i, j, z in itertools.product(['a','b'],['c','d'],[1,2,3])] +#[[i, j, z] for i, j, z in zip(['a','b'],['c','d'],[1,2,3])] + +# %% +""" +#### Second generation of nodes +""" + +# %% +#second generation +for node in root.root.generation(1): + node.children=[NodeJob(name=f"{child:03}", + parent=node, + path = f"{node.path}/{child:03}", + template_path = f'{root.template_path}/multiply_it', + #bsub -q hpc_acc -e %J.err -o %J.out cd $PWD && ./run.sh + submit_command = f'bsub -q hpc_acc -e %J.err -o %J.out cd {node.path}/{child:03} && {root.template_path}/multiply_it/run.sh &', + #submit_command = f'python {root.template_path}/multiply_it/run.py &', + log_file=f"{node.path}/{child:03}/log.json", + dictionary={'c': float(c[child])}) + for child in range(len(c))] + +root.to_json() + + +print('Done with the tree creation.') +print("--- %s seconds ---" % (time.time() - start_time)) +# %% +""" +### Cloning the templates of the nodes +From python objects we move the nodes to the file-system. +""" + +# %% +# We map the pythonic tree in a >folder< tree +start_time = time.time() +root.clean_log() +root.rm_children_folders() +from joblib import Parallel, delayed + +for depth in range(root.height): +# [x.clone_children() for x in root.generation(depth)] + Parallel(n_jobs=8)(delayed(x.clone_children)() for x in root.generation(depth)) + +# VERY IMPORTANT, tagging +root.tag_as('cloned') +print('The tree structure is moved to the file system.') +print("--- %s seconds ---" % (time.time() - start_time)) diff --git a/examples/002_example/002_chronjob.py b/examples/002_example/002_chronjob.py new file mode 100644 index 0000000000000000000000000000000000000000..0bde833f804c2672bc72743e678bc077d3c4dcf0 --- /dev/null +++ b/examples/002_example/002_chronjob.py @@ -0,0 +1,27 @@ +# %% +""" +Example of a chronjob +""" + +# %% +import tree_maker +from tree_maker import NodeJob + + +# %% +# Load the tree from a yaml +try: + root=tree_maker.tree_from_json( + f'./study_000/tree.json') +except Exception as e: + print(e) + print('Probably you forgot to edit the address of you json file...') + +if root.has_been('completed'): + print('All descendants of root are completed!') +else: + for node in root.descendants: + node.smart_run() + if all([descendant.has_been('completed') for descendant in root.descendants]): + root.tag_as('completed') + print('All descendants of root are completed!') diff --git a/examples/002_example/003_postprocessing.py b/examples/002_example/003_postprocessing.py new file mode 100644 index 0000000000000000000000000000000000000000..52d6e713e92185224772fbd611fe204d81e2ca9b --- /dev/null +++ b/examples/002_example/003_postprocessing.py @@ -0,0 +1,35 @@ +# %% +""" +Example of a chronjob +""" + +# %% +import tree_maker +from tree_maker import NodeJob +import pandas as pd +import awkward as ak +from joblib import Parallel, delayed + +#from dask import dataframe as dd +# %% +# Load the tree from a yaml +try: + root=tree_maker.tree_from_json( + f'./study_000/tree.json') +except Exception as e: + print(e) + print('Probably you forgot to edit the address of you json file...') + +my_list=[] +if root.has_been('completed'): + print('All descendants of root are completed!') + for node in root.generation(2): + #os.sytem(f'bsub cd {node.path} && {node.path_template} ') + #my_list.append(pd.read_parquet(f'{node.path}/test.parquet', columns=['x']).iloc[-1].x) + my_list.append(node.has_been('completed')) + #my_list.append(ak.from_parquet(f'{node.path}/test.parquet', columns=['x'])[-1,'x']) + #Parallel(n_jobs=16)(delayed(node.has_been)('completed') for node in root.generation(2)) + #print(my_list) +else: + print('Complete first all jobs') + diff --git a/examples/002_example/004_postprocessing.py b/examples/002_example/004_postprocessing.py new file mode 100644 index 0000000000000000000000000000000000000000..bcb06fd8df12919006054f6de9a67a1155ae86db --- /dev/null +++ b/examples/002_example/004_postprocessing.py @@ -0,0 +1,35 @@ +# %% +""" +Example of a chronjob +""" + +# %% +import tree_maker +from tree_maker import NodeJob +import pandas as pd +import awkward as ak +from joblib import Parallel, delayed + +#from dask import dataframe as dd +# %% +# Load the tree from a yaml +try: + root=tree_maker.tree_from_json( + f'./study_000/tree.json') +except Exception as e: + print(e) + print('Probably you forgot to edit the address of you json file...') + +my_list=[] +if root.has_been('completed'): + print('All descendants of root are completed!') + for node in root.generation(2)[0:100]: + #os.sytem(f'bsub cd {node.path} && {node.path_template} ') + #my_list.append(pd.read_parquet(f'{node.path}/test.parquet', columns=['x']).iloc[-1].x) + #ak.from_parquet('test.parquet', columns='x', row_groups=99)['x',-1] + my_list.append(ak.from_parquet(f'{node.path}/test.parquet', columns=['x'], row_groups=99)['x',-1]) + #Parallel(n_jobs=16)(delayed(node.has_been)('completed') for node in root.generation(2)) + print(my_list) +else: + print('Complete first all jobs') + diff --git a/examples/002_example/005_postprocessing.py b/examples/002_example/005_postprocessing.py new file mode 100644 index 0000000000000000000000000000000000000000..bb34f5b22c8b8a1db23a7ac611a5e811787289e8 --- /dev/null +++ b/examples/002_example/005_postprocessing.py @@ -0,0 +1,32 @@ +# %% +""" +Example of a chronjob +""" + +# %% +import tree_maker +from tree_maker import NodeJob +import pandas as pd +import awkward as ak +import os + +#from dask import dataframe as dd +# %% +# Load the tree from a yaml +try: + root=tree_maker.tree_from_json( + f'./study_000/tree.json') +except Exception as e: + print(e) + print('Probably you forgot to edit the address of you json file...') + +my_list=[] +if root.has_been('completed'): + print('All descendants of root are completed!') + for node in root.generation(1): + node.tag_as('postprocessing_submitted') + node.submit_command=f'bsub -q hpc_acc {node.template_path}/postprocess.sh &' + node.submit() +else: + print('Complete first all jobs') + diff --git a/examples/002_example/templates/multiply_it/config.yaml b/examples/002_example/templates/multiply_it/config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..af123f86879a80214cd89734e405627942305f93 --- /dev/null +++ b/examples/002_example/templates/multiply_it/config.yaml @@ -0,0 +1,4 @@ +# This is my input +parent: '../sum_it' # this is the first element of the product +c: -1 # this is the second element of the product +log_file: './log.yaml' \ No newline at end of file diff --git a/examples/002_example/templates/multiply_it/log.yaml b/examples/002_example/templates/multiply_it/log.yaml new file mode 100644 index 0000000000000000000000000000000000000000..1c0e2dbbd8b758763cdac69e3ee9d82c47152759 --- /dev/null +++ b/examples/002_example/templates/multiply_it/log.yaml @@ -0,0 +1,52 @@ +{ + "0": { + "tag": "started", + "unix_time": 1624890907618272000, + "human_time": "2021-06-28 16:35:07.618272" + }, + "1": { + "tag": "completed", + "unix_time": 1624890908593553920, + "human_time": "2021-06-28 16:35:08.593554" + }, + "2": { + "tag": "started", + "unix_time": 1624890995812024064, + "human_time": "2021-06-28 16:36:35.812024" + }, + "3": { + "tag": "completed", + "unix_time": 1624890995928683008, + "human_time": "2021-06-28 16:36:35.928683" + }, + "4": { + "tag": "started", + "unix_time": 1624891021181616128, + "human_time": "2021-06-28 16:37:01.181616" + }, + "5": { + "tag": "completed", + "unix_time": 1624891021380608000, + "human_time": "2021-06-28 16:37:01.380608" + }, + "6": { + "tag": "started", + "unix_time": 1624891070778615040, + "human_time": "2021-06-28 16:37:50.778615" + }, + "7": { + "tag": "completed", + "unix_time": 1624891070982253056, + "human_time": "2021-06-28 16:37:50.982253" + }, + "8": { + "tag": "started", + "unix_time": 1624891074472503808, + "human_time": "2021-06-28 16:37:54.472504" + }, + "9": { + "tag": "completed", + "unix_time": 1624891074613457920, + "human_time": "2021-06-28 16:37:54.613458" + } +} \ No newline at end of file diff --git a/examples/002_example/templates/multiply_it/output.yaml b/examples/002_example/templates/multiply_it/output.yaml new file mode 100644 index 0000000000000000000000000000000000000000..b37b5c8c3ab85eafc6e547fe86a6fc1b7b471dd0 --- /dev/null +++ b/examples/002_example/templates/multiply_it/output.yaml @@ -0,0 +1 @@ +result: 2 diff --git a/examples/002_example/templates/multiply_it/run.py b/examples/002_example/templates/multiply_it/run.py new file mode 100644 index 0000000000000000000000000000000000000000..4e99e1c0a515f596180e022199da549372ab68c1 --- /dev/null +++ b/examples/002_example/templates/multiply_it/run.py @@ -0,0 +1,33 @@ +import json +import numpy as np +import ruamel.yaml +import tree_maker + +# load the configuration +with open('config.yaml', 'r') as file: + yaml = ruamel.yaml.YAML() + cfg = yaml.load(file) + +with open(cfg['parent']+'/output.yaml', 'r') as file: + yaml = ruamel.yaml.YAML() + parent_out = yaml.load(file) + +tree_maker.tag_json.tag_it(cfg['log_file'], 'started') + +# define the function (product of two numbers) +def my_function(my_x, my_y): + 'Just a multiplication' + return my_x*my_y + +# run the code +result = my_function(parent_out['result'], cfg['c']) + +with open('output.yaml', 'w') as fp: + yaml = ruamel.yaml.YAML() + yaml.dump({'result': result}, fp) + +import pandas as pd + +pd.DataFrame(np.random.randn(100000,6), columns=['x','xp','y','yp','z','zp']).to_parquet('test.parquet', row_group_size=1000) + +tree_maker.tag_json.tag_it(cfg['log_file'], 'completed') diff --git a/examples/002_example/templates/multiply_it/run.sh b/examples/002_example/templates/multiply_it/run.sh new file mode 100755 index 0000000000000000000000000000000000000000..71826e6bf2979c96b8268029877d9e0a4b37ff6c --- /dev/null +++ b/examples/002_example/templates/multiply_it/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +source /afs/cern.ch/eng/tracking-tools/python_installations/miniconda3/bin/activate +python /gpfs/gpfs/gpfs_maestro_home_new/hpc/sterbini/tree_maker/examples/002_example/templates/multiply_it/run.py diff --git a/examples/002_example/templates/multiply_it/run.sub b/examples/002_example/templates/multiply_it/run.sub new file mode 100644 index 0000000000000000000000000000000000000000..0aedcfa7fcc82bd71ff4365241fe0dab62d63984 --- /dev/null +++ b/examples/002_example/templates/multiply_it/run.sub @@ -0,0 +1,12 @@ +#initialdir = . +executable = run.sh +output = .output.txt +error = .err.txt +log = .log.txt +should_transfer_files = yes +when_to_transfer_output = on_exit +transfer_input_files = config.yaml, run.py +# The line below can be commented it necessary +#transfer_output_files = output.yaml ++JobFlavour = "espresso" +queue diff --git a/examples/002_example/templates/multiply_it/test.parquet b/examples/002_example/templates/multiply_it/test.parquet new file mode 100644 index 0000000000000000000000000000000000000000..e13e974da97f17ae10097ec2e19c02ad85d4a52f Binary files /dev/null and b/examples/002_example/templates/multiply_it/test.parquet differ diff --git a/examples/002_example/templates/sum_it/config.yaml b/examples/002_example/templates/sum_it/config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..a87ce4d7e300fd2ac7fd0b1b673ff95961a14eea --- /dev/null +++ b/examples/002_example/templates/sum_it/config.yaml @@ -0,0 +1,5 @@ +# This is my input +a: -1 # this is the first element of the sum +b: -1 # this is the second element of the sum +run_command: 'python run.py' +log_file: './log.yaml' diff --git a/examples/002_example/templates/sum_it/log.yaml b/examples/002_example/templates/sum_it/log.yaml new file mode 100644 index 0000000000000000000000000000000000000000..70afad740ef8c41ac429f2bca44b0da4d0ddf20e --- /dev/null +++ b/examples/002_example/templates/sum_it/log.yaml @@ -0,0 +1,8 @@ +0: + tag: started + unix_time: 1624652829.310353 + human_time: 2021-06-25 22:27:09.310386 +1: + tag: completed + unix_time: 1624652829.318002 + human_time: 2021-06-25 22:27:09.318006 diff --git a/examples/002_example/templates/sum_it/output.yaml b/examples/002_example/templates/sum_it/output.yaml new file mode 100644 index 0000000000000000000000000000000000000000..49f08b507230fa33cd9831b0fa3b1876f5b19d4d --- /dev/null +++ b/examples/002_example/templates/sum_it/output.yaml @@ -0,0 +1 @@ +result: -2 diff --git a/examples/002_example/templates/sum_it/postprocess.py b/examples/002_example/templates/sum_it/postprocess.py new file mode 100644 index 0000000000000000000000000000000000000000..6bfcf2d668b0ac80e71c0bb750859c46f9b77079 --- /dev/null +++ b/examples/002_example/templates/sum_it/postprocess.py @@ -0,0 +1,11 @@ +import glob +import awkward as ak +import numpy as np + +my_folders=sorted(glob.glob('0*')) +my_list=[] +for my_folder in my_folders: + aux=ak.from_parquet(f'{my_folder}/test.parquet') + my_list.append(np.mean(aux)) +aux=ak.Array(my_list) +ak.to_parquet(aux,'./summary.parquet') diff --git a/examples/002_example/templates/sum_it/postprocess.sh b/examples/002_example/templates/sum_it/postprocess.sh new file mode 100755 index 0000000000000000000000000000000000000000..30aa383c284f910819f38241b7c8b6e4609c6d7e --- /dev/null +++ b/examples/002_example/templates/sum_it/postprocess.sh @@ -0,0 +1,4 @@ +#!/bin/bash +#bsub -q hpc_acc -e %J.err -o %J.out cd $PWD && ./run.sh +source /afs/cern.ch/eng/tracking-tools/python_installations/miniconda3/bin/activate +python /gpfs/gpfs/gpfs_maestro_home_new/hpc/sterbini/tree_maker/examples/002_example/templates/sum_it/postprocess.py diff --git a/examples/002_example/templates/sum_it/run.py b/examples/002_example/templates/sum_it/run.py new file mode 100644 index 0000000000000000000000000000000000000000..5458868636ff065a768315943219b9659ed763f7 --- /dev/null +++ b/examples/002_example/templates/sum_it/run.py @@ -0,0 +1,25 @@ +import json +import numpy as np +import ruamel.yaml +import tree_maker + +# load the configuration +with open('config.yaml', 'r') as file: + yaml = ruamel.yaml.YAML() + cfg = yaml.load(file) + +tree_maker.tag_json.tag_it(cfg['log_file'], 'started') + +# define the function (sum of two numbers) +def my_function(my_x, my_y): + 'Just an addition' + return my_x+my_y + +# run the code +result = my_function(cfg['a'], cfg['b']) + +with open('output.yaml', 'w') as fp: + yaml = ruamel.yaml.YAML() + yaml.dump({'result': result}, fp) + +tree_maker.tag_json.tag_it(cfg['log_file'], 'completed') diff --git a/examples/002_example/templates/sum_it/run.sh b/examples/002_example/templates/sum_it/run.sh new file mode 100755 index 0000000000000000000000000000000000000000..33aae7ea3c9abc6666679ebf633b5b2976da9995 --- /dev/null +++ b/examples/002_example/templates/sum_it/run.sh @@ -0,0 +1,4 @@ +#!/bin/bash +#bsub -q hpc_acc -e %J.err -o %J.out cd $PWD && ./run.sh +source /afs/cern.ch/eng/tracking-tools/python_installations/miniconda3/bin/activate +python /gpfs/gpfs/gpfs_maestro_home_new/hpc/sterbini/tree_maker/examples/002_example/templates/sum_it/run.py diff --git a/examples/002_example/templates/sum_it/run.sub b/examples/002_example/templates/sum_it/run.sub new file mode 100644 index 0000000000000000000000000000000000000000..0aedcfa7fcc82bd71ff4365241fe0dab62d63984 --- /dev/null +++ b/examples/002_example/templates/sum_it/run.sub @@ -0,0 +1,12 @@ +#initialdir = . +executable = run.sh +output = .output.txt +error = .err.txt +log = .log.txt +should_transfer_files = yes +when_to_transfer_output = on_exit +transfer_input_files = config.yaml, run.py +# The line below can be commented it necessary +#transfer_output_files = output.yaml ++JobFlavour = "espresso" +queue diff --git a/tree_maker/NodeJob.py b/tree_maker/NodeJob.py index 4ea0b372e7787cd9cd46a96baa4ab7d6007bbd29..f2017184a6f21ebed5e341b0bcf64dc3ef361d74 100644 --- a/tree_maker/NodeJob.py +++ b/tree_maker/NodeJob.py @@ -123,14 +123,13 @@ class NodeJob(NodeJobBase, NodeMixin): # Add Node feature return False def has_been(self, tag): - if self._is_logging_file(): - my_df= pd.DataFrame(tree_maker.from_yaml(self.log_file)).transpose() - if (len(my_df)>0) and (tag in my_df['tag'].values): - return True - else: - return False + #if self._is_logging_file(): + if tag in tree_maker.from_json(self.log_file).keys(): + return True else: - return False + return False + #else: + # return False def has_not_been(self, tag): return not self.has_been(tag) diff --git a/tree_maker/__init__.py b/tree_maker/__init__.py index fdb88282b41be950ab37c663d3be81a266b27efe..fa42a9e051012c1a45dfb5054efbdeee18d09f26 100644 --- a/tree_maker/__init__.py +++ b/tree_maker/__init__.py @@ -10,6 +10,7 @@ from .NodeJob import NodeJob from .general import tree_from_yaml from .general import tree_from_json from .general import from_yaml +from .general import from_json from .tag import * from .tag_json import * diff --git a/tree_maker/general.py b/tree_maker/general.py index 92eb75abd547d6db45842cd58146754d43e2142e..f7f8ce0e17b74a341b37a6aea4faa17c8b15f64a 100644 --- a/tree_maker/general.py +++ b/tree_maker/general.py @@ -5,6 +5,7 @@ from tree_maker import NodeJob import yaml # pip install pyyaml import ruamel.yaml import json +import orjson ryaml = ruamel.yaml.YAML() @@ -14,7 +15,7 @@ def tree_from_yaml(filename='tree.yaml'): def tree_from_json(filename='tree.json'): with open(filename, "r") as file: - return DictImporter(nodecls=NodeJob).import_(json.load(file)) + return DictImporter(nodecls=NodeJob).import_(orjson.loads(file.read())) def from_yaml(filename): try: @@ -23,3 +24,11 @@ def from_yaml(filename): except Exception as e: print(e) return {} + +def from_json(filename, verbose=False): + try: + with open(filename, 'r') as file: + return orjson.loads(file.read()) + except Exception as e: + if verbose: print(e) + return {} diff --git a/tree_maker/tag_json.py b/tree_maker/tag_json.py index 80dd29115eee9efbebbd2c555a59132bf1272155..c3a0d9313e1ea39a79efceaf69cd7c914b173742 100644 --- a/tree_maker/tag_json.py +++ b/tree_maker/tag_json.py @@ -92,10 +92,11 @@ def tag_it(myfile, mycomment): This will create a a human readable and a unix time stamp and append that to 'myfile', including the comment 'hello' """ - stage = get_last_stage(myfile) - my_dict = {stage: {}} - my_dict[stage]['tag'] = mycomment + #stage = get_last_stage(myfile) + #my_dict = {stage: {}} + #my_dict[stage]['tag'] = mycomment my_now=datetime.datetime.now() - my_dict[stage]['unix_time'] = int(1e9*my_now.timestamp()) #in nanoseconds - my_dict[stage]['human_time'] = str(my_now) - append_json(my_dict, myfile) + #my_dict[stage]['unix_time'] = int(1e9*my_now.timestamp()) #in nanoseconds + #my_dict[stage]['human_time'] = str(my_now) + #append_json(my_dict, myfile) + append_json({mycomment: int(1e9*my_now.timestamp())}, myfile)