Skip to content
Snippets Groups Projects
Commit ea783148 authored by Guido Sterbini's avatar Guido Sterbini
Browse files
parents 3e7c1e4e ea9a9662
No related branches found
No related tags found
2 merge requests!2Dev v0.0.1,!1Dev v0.0.1
Pipeline #2787786 failed
Showing
with 402 additions and 12 deletions
......@@ -3,18 +3,10 @@ tree_maker API documentation
tag.py
------
.. autofunction:: tree_maker.tag.read_yaml
|
.. autofunction:: tree_maker.tag.write_yaml
|
.. autofunction:: tree_maker.tag.append_yaml
|
.. autofunction:: tree_maker.tag.get_last_stage
|
.. autofunction:: tree_maker.tag.tag_first
|
.. autofunction:: tree_maker.tag.tag_it
.. automodule:: tree_maker.tag
:members:
NodeJob.py
----------
.. autofunction:: tree_maker.NodeJob.NodeJob.print_it
.. autoclass:: tree_maker.NodeJob.NodeJob
:members:
# %%
import tree_maker
from tree_maker import NodeJob
import time
# %%
# Clearly for this easy task on can do all in the very same python kernel
# BUT here we want to mimic the typical flow
# 1. MADX for optics matching/error seeding
# 2. Tracking for FMA and or DA studies
# 3. simulation baby-sitting and
# 4. postprocessing
import numpy as np
a=np.random.randn(50)
b=np.random.randn(50)
c=np.random.randn(10)
my_list_original=[]
for ii in c:
my_list_original+=list(np.sqrt(np.abs((a+b)*ii)))
my_list_original=sorted(my_list_original)
# %%
"""
#### The root of the tree
"""
start_time = time.time()
# %%
#root
import os
my_folder = os.getcwd()
root = NodeJob(name='root', parent=None)
root.path = my_folder + '/study_000'
root.template_path = my_folder + '/templates'
root.log_file = root.path + "/log.json"
# %%
"""
#### First generation of nodes
"""
# %%
#first generation
for node in root.root.generation(0):
node.children=[NodeJob(name=f"{child:03}",
parent=node,
path=f"{node.path}/{child:03}",
template_path = root.template_path+'/sum_it',
#submit_command = f'python {root.template_path}/sum_it/run.py &',
submit_command = f'bsub -q hpc_acc -e %J.err -o %J.out cd {node.path}/{child:03} && {root.template_path}/sum_it/run.sh &',
log_file=f"{node.path}/{child:03}/log.json",
dictionary={'a':float(a[child]),
'b':float(b[child])
})
for child in range(len(a))]
# To combine different lists one can use the product or the zip functions
#import itertools
#[[i, j, z] for i, j, z in itertools.product(['a','b'],['c','d'],[1,2,3])]
#[[i, j, z] for i, j, z in zip(['a','b'],['c','d'],[1,2,3])]
# %%
"""
#### Second generation of nodes
"""
# %%
#second generation
for node in root.root.generation(1):
node.children=[NodeJob(name=f"{child:03}",
parent=node,
path = f"{node.path}/{child:03}",
template_path = f'{root.template_path}/multiply_it',
#bsub -q hpc_acc -e %J.err -o %J.out cd $PWD && ./run.sh
submit_command = f'bsub -q hpc_acc -e %J.err -o %J.out cd {node.path}/{child:03} && {root.template_path}/multiply_it/run.sh &',
#submit_command = f'python {root.template_path}/multiply_it/run.py &',
log_file=f"{node.path}/{child:03}/log.json",
dictionary={'c': float(c[child])})
for child in range(len(c))]
root.to_json()
print('Done with the tree creation.')
print("--- %s seconds ---" % (time.time() - start_time))
# %%
"""
### Cloning the templates of the nodes
From python objects we move the nodes to the file-system.
"""
# %%
# We map the pythonic tree in a >folder< tree
start_time = time.time()
root.clean_log()
root.rm_children_folders()
from joblib import Parallel, delayed
for depth in range(root.height):
# [x.clone_children() for x in root.generation(depth)]
Parallel(n_jobs=8)(delayed(x.clone_children)() for x in root.generation(depth))
# VERY IMPORTANT, tagging
root.tag_as('cloned')
print('The tree structure is moved to the file system.')
print("--- %s seconds ---" % (time.time() - start_time))
# %%
"""
Example of a chronjob
"""
# %%
import tree_maker
from tree_maker import NodeJob
# %%
# Load the tree from a yaml
try:
root=tree_maker.tree_from_json(
f'./study_000/tree.json')
except Exception as e:
print(e)
print('Probably you forgot to edit the address of you json file...')
if root.has_been('completed'):
print('All descendants of root are completed!')
else:
for node in root.descendants:
node.smart_run()
if all([descendant.has_been('completed') for descendant in root.descendants]):
root.tag_as('completed')
print('All descendants of root are completed!')
# %%
"""
Example of a chronjob
"""
# %%
import tree_maker
from tree_maker import NodeJob
import pandas as pd
import awkward as ak
from joblib import Parallel, delayed
#from dask import dataframe as dd
# %%
# Load the tree from a yaml
try:
root=tree_maker.tree_from_json(
f'./study_000/tree.json')
except Exception as e:
print(e)
print('Probably you forgot to edit the address of you json file...')
my_list=[]
if root.has_been('completed'):
print('All descendants of root are completed!')
for node in root.generation(2):
#os.sytem(f'bsub cd {node.path} && {node.path_template} ')
#my_list.append(pd.read_parquet(f'{node.path}/test.parquet', columns=['x']).iloc[-1].x)
my_list.append(node.has_been('completed'))
#my_list.append(ak.from_parquet(f'{node.path}/test.parquet', columns=['x'])[-1,'x'])
#Parallel(n_jobs=16)(delayed(node.has_been)('completed') for node in root.generation(2))
#print(my_list)
else:
print('Complete first all jobs')
# %%
"""
Example of a chronjob
"""
# %%
import tree_maker
from tree_maker import NodeJob
import pandas as pd
import awkward as ak
from joblib import Parallel, delayed
#from dask import dataframe as dd
# %%
# Load the tree from a yaml
try:
root=tree_maker.tree_from_json(
f'./study_000/tree.json')
except Exception as e:
print(e)
print('Probably you forgot to edit the address of you json file...')
my_list=[]
if root.has_been('completed'):
print('All descendants of root are completed!')
for node in root.generation(2)[0:100]:
#os.sytem(f'bsub cd {node.path} && {node.path_template} ')
#my_list.append(pd.read_parquet(f'{node.path}/test.parquet', columns=['x']).iloc[-1].x)
#ak.from_parquet('test.parquet', columns='x', row_groups=99)['x',-1]
my_list.append(ak.from_parquet(f'{node.path}/test.parquet', columns=['x'], row_groups=99)['x',-1])
#Parallel(n_jobs=16)(delayed(node.has_been)('completed') for node in root.generation(2))
print(my_list)
else:
print('Complete first all jobs')
# %%
"""
Example of a chronjob
"""
# %%
import tree_maker
from tree_maker import NodeJob
import pandas as pd
import awkward as ak
import os
#from dask import dataframe as dd
# %%
# Load the tree from a yaml
try:
root=tree_maker.tree_from_json(
f'./study_000/tree.json')
except Exception as e:
print(e)
print('Probably you forgot to edit the address of you json file...')
my_list=[]
if root.has_been('completed'):
print('All descendants of root are completed!')
for node in root.generation(1):
node.tag_as('postprocessing_submitted')
node.submit_command=f'bsub -q hpc_acc {node.template_path}/postprocess.sh &'
node.submit()
else:
print('Complete first all jobs')
# This is my input
parent: '../sum_it' # this is the first element of the product
c: -1 # this is the second element of the product
log_file: './log.yaml'
\ No newline at end of file
{
"0": {
"tag": "started",
"unix_time": 1624890907618272000,
"human_time": "2021-06-28 16:35:07.618272"
},
"1": {
"tag": "completed",
"unix_time": 1624890908593553920,
"human_time": "2021-06-28 16:35:08.593554"
},
"2": {
"tag": "started",
"unix_time": 1624890995812024064,
"human_time": "2021-06-28 16:36:35.812024"
},
"3": {
"tag": "completed",
"unix_time": 1624890995928683008,
"human_time": "2021-06-28 16:36:35.928683"
},
"4": {
"tag": "started",
"unix_time": 1624891021181616128,
"human_time": "2021-06-28 16:37:01.181616"
},
"5": {
"tag": "completed",
"unix_time": 1624891021380608000,
"human_time": "2021-06-28 16:37:01.380608"
},
"6": {
"tag": "started",
"unix_time": 1624891070778615040,
"human_time": "2021-06-28 16:37:50.778615"
},
"7": {
"tag": "completed",
"unix_time": 1624891070982253056,
"human_time": "2021-06-28 16:37:50.982253"
},
"8": {
"tag": "started",
"unix_time": 1624891074472503808,
"human_time": "2021-06-28 16:37:54.472504"
},
"9": {
"tag": "completed",
"unix_time": 1624891074613457920,
"human_time": "2021-06-28 16:37:54.613458"
}
}
\ No newline at end of file
result: 2
import json
import numpy as np
import ruamel.yaml
import tree_maker
# load the configuration
with open('config.yaml', 'r') as file:
yaml = ruamel.yaml.YAML()
cfg = yaml.load(file)
with open(cfg['parent']+'/output.yaml', 'r') as file:
yaml = ruamel.yaml.YAML()
parent_out = yaml.load(file)
tree_maker.tag_json.tag_it(cfg['log_file'], 'started')
# define the function (product of two numbers)
def my_function(my_x, my_y):
'Just a multiplication'
return my_x*my_y
# run the code
result = my_function(parent_out['result'], cfg['c'])
with open('output.yaml', 'w') as fp:
yaml = ruamel.yaml.YAML()
yaml.dump({'result': result}, fp)
import pandas as pd
pd.DataFrame(np.random.randn(100000,6), columns=['x','xp','y','yp','z','zp']).to_parquet('test.parquet', row_group_size=1000)
tree_maker.tag_json.tag_it(cfg['log_file'], 'completed')
#!/bin/bash
source /afs/cern.ch/eng/tracking-tools/python_installations/miniconda3/bin/activate
python /gpfs/gpfs/gpfs_maestro_home_new/hpc/sterbini/tree_maker/examples/002_example/templates/multiply_it/run.py
#initialdir = .
executable = run.sh
output = .output.txt
error = .err.txt
log = .log.txt
should_transfer_files = yes
when_to_transfer_output = on_exit
transfer_input_files = config.yaml, run.py
# The line below can be commented it necessary
#transfer_output_files = output.yaml
+JobFlavour = "espresso"
queue
File added
# This is my input
a: -1 # this is the first element of the sum
b: -1 # this is the second element of the sum
run_command: 'python run.py'
log_file: './log.yaml'
0:
tag: started
unix_time: 1624652829.310353
human_time: 2021-06-25 22:27:09.310386
1:
tag: completed
unix_time: 1624652829.318002
human_time: 2021-06-25 22:27:09.318006
result: -2
import glob
import awkward as ak
import numpy as np
my_folders=sorted(glob.glob('0*'))
my_list=[]
for my_folder in my_folders:
aux=ak.from_parquet(f'{my_folder}/test.parquet')
my_list.append(np.mean(aux))
aux=ak.Array(my_list)
ak.to_parquet(aux,'./summary.parquet')
#!/bin/bash
#bsub -q hpc_acc -e %J.err -o %J.out cd $PWD && ./run.sh
source /afs/cern.ch/eng/tracking-tools/python_installations/miniconda3/bin/activate
python /gpfs/gpfs/gpfs_maestro_home_new/hpc/sterbini/tree_maker/examples/002_example/templates/sum_it/postprocess.py
import json
import numpy as np
import ruamel.yaml
import tree_maker
# load the configuration
with open('config.yaml', 'r') as file:
yaml = ruamel.yaml.YAML()
cfg = yaml.load(file)
tree_maker.tag_json.tag_it(cfg['log_file'], 'started')
# define the function (sum of two numbers)
def my_function(my_x, my_y):
'Just an addition'
return my_x+my_y
# run the code
result = my_function(cfg['a'], cfg['b'])
with open('output.yaml', 'w') as fp:
yaml = ruamel.yaml.YAML()
yaml.dump({'result': result}, fp)
tree_maker.tag_json.tag_it(cfg['log_file'], 'completed')
#!/bin/bash
#bsub -q hpc_acc -e %J.err -o %J.out cd $PWD && ./run.sh
source /afs/cern.ch/eng/tracking-tools/python_installations/miniconda3/bin/activate
python /gpfs/gpfs/gpfs_maestro_home_new/hpc/sterbini/tree_maker/examples/002_example/templates/sum_it/run.py
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment