Skip to content
Snippets Groups Projects
Commit e2e737f3 authored by Dejan Golubovic's avatar Dejan Golubovic
Browse files

Add kfp eos example

parent be13af84
No related branches found
No related tags found
No related merge requests found
......@@ -12,7 +12,7 @@ Store intermediate data on personal EOS.
### How to run?
- Open **mnist-kfp/mnist-kfp-eos.ipynb** in your Notebook server
- Open **mnist-kfp/mnist-kfp.ipynb** in your Notebook server
- Run all the cells
- Download created pipeline .yaml file
- Open Pipelines
......@@ -21,4 +21,21 @@ Store intermediate data on personal EOS.
- Click Create Run
- The experiment should finish as Successful
Current issue with EOS: Fix by reading local credentials and pass it to every cell.
\ No newline at end of file
## Example - mnist-kfp-eos
### What is it about?
Same as above, only with EOS access.
### How to run?
- Open a notebook terminal
- Authenticate with kerberos
- `kinit <cernid>`
- When kerberos has been refreshed, remove any old secret before creating a new one
- `kubectl delete secret krb-secret`
- Create a kerberos secret for Kubernetes
- `kubectl create secret generic krb-secret --from-file=/tmp/krb5cc_1000`
- Open **mnist-kfp/mnist-kfp-eos.ipynb** in your Notebook server
- Run the cells
%% Cell type:markdown id: tags:
# MNIST classification
%% Cell type:markdown id: tags:
### Imports for Compilation
%% Cell type:code id: tags:
``` python
import kfp
from kfp.components import func_to_container_op, InputPath, OutputPath
from kfp import dsl
from kubernetes import client as k8s_client
import yaml
```
%% Cell type:markdown id: tags:
### Read Data Function
%% Cell type:code id: tags:
``` python
def read_data(output_text_path: OutputPath(str), base_image='a'):
import tensorflow as tf
import numpy as np
import os
from zipfile import ZipFile
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
# Rescale the images from [0,255] to the [0.0,1.0] range.
x_train, x_test = x_train[..., np.newaxis]/255.0, x_test[..., np.newaxis]/255.0
os.system('pwd')
os.system('ls -la')
os.system('whoami')
np.save('xtrain.npy', x_train)
np.save('ytrain.npy', y_train)
np.save('xtest.npy', x_test)
np.save('ytest.npy', y_test)
zipObj = ZipFile(output_text_path, 'w')
zipObj.write('xtrain.npy')
zipObj.write('ytrain.npy')
zipObj.write('xtest.npy')
zipObj.write('ytest.npy')
zipObj.close()
```
%% Cell type:markdown id: tags:
### Read Data Pipeline Component
%% Cell type:code id: tags:
``` python
read_data_comp = func_to_container_op(
func=read_data,
base_image='gitlab-registry.cern.ch/ai-ml/kubeflow_images/tensorflow-notebook-gpu-2.1.0:v0.6.1-33'
)
```
%% Cell type:markdown id: tags:
### Preprocess Data Function
%% Cell type:code id: tags:
``` python
def preprocess_data(text_path: InputPath(), output_text_path: OutputPath()):
import numpy as np
import os
import tarfile
print('tarfile imported')
from zipfile import ZipFile
with ZipFile(text_path, 'r') as zipObj:
zipObj.extractall()
# Load data
x_train = np.load('xtrain.npy')
y_train = np.load('ytrain.npy')
x_test = np.load('xtest.npy')
y_test = np.load('ytest.npy')
# Filter 3 and 6
def filter_36(x, y):
keep = (y == 3) | (y == 6)
x, y = x[keep], y[keep]
y = y == 3
return x,y
print("Number of unfiltered training examples:", len(x_train))
print("Number of unfiltered test examples:", len(x_test))
x_train, y_train = filter_36(x_train, y_train)
x_test, y_test = filter_36(x_test, y_test)
print("Number of filtered training examples:", len(x_train))
print("Number of filtered test examples:", len(x_test))
# Save modified data
np.save('xtrain_filtered.npy', x_train)
np.save('ytrain_filtered.npy', y_train)
np.save('xtest_filtered.npy', x_test)
np.save('ytest_filtered.npy', y_test)
zipObj = ZipFile(output_text_path, 'w')
zipObj.write('xtrain_filtered.npy')
zipObj.write('ytrain_filtered.npy')
zipObj.write('xtest_filtered.npy')
zipObj.write('ytest_filtered.npy')
zipObj.close()
```
%% Cell type:markdown id: tags:
### Preprocess Data Pipeline Component
%% Cell type:code id: tags:
``` python
preprocess_data_comp = func_to_container_op(
func=preprocess_data,
base_image='gitlab-registry.cern.ch/ai-ml/kubeflow_images/tensorflow-notebook-gpu-2.1.0:v0.6.1-33'
)
```
%% Cell type:markdown id: tags:
### Train Full Model Function
%% Cell type:code id: tags:
``` python
def model_full(text_path: InputPath(), output_text_path: OutputPath()):
# A simple model based off LeNet from https://keras.io/examples/mnist_cnn/
import tensorflow as tf
from zipfile import ZipFile
import numpy as np
model = tf.keras.Sequential()
model.add(tf.keras.layers.Conv2D(32, [3, 3], activation='relu', input_shape=(28,28,1)))
model.add(tf.keras.layers.Conv2D(64, [3, 3], activation='relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
model.add(tf.keras.layers.Dropout(0.25))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.5))
model.add(tf.keras.layers.Dense(1))
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
model.summary()
with ZipFile(text_path, 'r') as zipObj:
zipObj.extractall()
# Load data
x_train = np.load('xtrain_filtered.npy')
y_train = np.load('ytrain_filtered.npy')
x_test = np.load('xtest_filtered.npy')
y_test = np.load('ytest_filtered.npy')
model.fit(x_train, y_train, batch_size=128, epochs=1, verbose=1, validation_data=(x_test, y_test))
cnn_results = model.evaluate(x_test, y_test)
with open(output_text_path, 'w') as writer:
writer.write(str(cnn_results) + '\n')
```
%% Cell type:markdown id: tags:
### Train Full Model Pipeline Component
%% Cell type:code id: tags:
``` python
model_full_comp = func_to_container_op(
func=model_full,
base_image='gitlab-registry.cern.ch/ai-ml/kubeflow_images/tensorflow-notebook-gpu-2.1.0:v0.6.1-33'
)
```
%% Cell type:markdown id: tags:
### Train Fair Model Function
%% Cell type:code id: tags:
``` python
def model_fair(text_path: InputPath(), output_text_path: OutputPath()):
import tensorflow as tf
from zipfile import ZipFile
import numpy as np
model = tf.keras.Sequential()
model.add(tf.keras.layers.Flatten(input_shape=(28,28,1)))
model.add(tf.keras.layers.Dense(2, activation='relu'))
model.add(tf.keras.layers.Dense(1))
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
model.summary()
with ZipFile(text_path, 'r') as zipObj:
zipObj.extractall()
# Load data
x_train = np.load('xtrain_filtered.npy')
y_train = np.load('ytrain_filtered.npy')
x_test = np.load('xtest_filtered.npy')
y_test = np.load('ytest_filtered.npy')
model.fit(x_train, y_train, batch_size=128, epochs=1, verbose=1, validation_data=(x_test, y_test))
cnn_results = model.evaluate(x_test, y_test)
with open(output_text_path, 'w') as writer:
writer.write(str(cnn_results) + '\n')
```
%% Cell type:markdown id: tags:
### Train Fair Model Pipeline Component
%% Cell type:code id: tags:
``` python
model_fair_comp = func_to_container_op(
func=model_fair,
base_image='gitlab-registry.cern.ch/ai-ml/kubeflow_images/tensorflow-notebook-gpu-2.1.0:v0.6.1-33'
)
```
%% Cell type:markdown id: tags:
### Evaluate Models Function
%% Cell type:code id: tags:
``` python
def models_evaluate(text_path_0: InputPath(), text_path_1: InputPath()):
print('model 0:')
with open(text_path_0, 'r') as reader:
for line in reader:
print(line, end = '')
print('model 1:')
with open(text_path_1, 'r') as reader:
for line in reader:
print(line, end = '')
```
%% Cell type:markdown id: tags:
### Evaluate Models Pipeline Component
%% Cell type:code id: tags:
``` python
models_evaluate_comp = func_to_container_op(
func=models_evaluate,
base_image='gitlab-registry.cern.ch/ai-ml/kubeflow_images/tensorflow-notebook-gpu-2.1.0:v0.6.1-33'
)
```
%% Cell type:markdown id: tags:
### Create Pipeline
%% Cell type:code id: tags:
``` python
krb_secret = k8s_client.V1SecretVolumeSource(secret_name='krb-secret')
krb_secret_volume = k8s_client.V1Volume(name='krb-secret-vol', secret=krb_secret)
krb_secret_volume_mount = k8s_client.V1VolumeMount(name=krb_secret_volume.name, mount_path='/secret/krb-secret-vol')
eos_host_path = k8s_client.V1HostPathVolumeSource(path='/var/eos')
eos_volume = k8s_client.V1Volume(name='eos', host_path=eos_host_path)
eos_volume_mount = k8s_client.V1VolumeMount(name=eos_volume.name, mount_path='/eos')
@dsl.pipeline(
name='test-eos-kfp',
description='TEST EOS).'
)
def ml_pipeline_first():
data_dir = read_data_comp() \
.add_volume(krb_secret_volume) \
.add_volume_mount(krb_secret_volume_mount) \
.add_volume(eos_volume) \
.add_volume_mount(eos_volume_mount)
new_dir = preprocess_data_comp(data_dir.output) \
.add_volume(krb_secret_volume) \
.add_volume_mount(krb_secret_volume_mount) \
.add_volume(eos_volume) \
.add_volume_mount(eos_volume_mount)
cnn_res = model_full_comp(new_dir.output) \
.add_volume(krb_secret_volume) \
.add_volume_mount(krb_secret_volume_mount) \
.add_volume(eos_volume) \
.add_volume_mount(eos_volume_mount)
fairnn_res = model_fair_comp(new_dir.output) \
.add_volume(krb_secret_volume) \
.add_volume_mount(krb_secret_volume_mount) \
.add_volume(eos_volume) \
.add_volume_mount(eos_volume_mount)
models_evaluate_comp(cnn_res.output, fairnn_res.output) \
.add_volume(krb_secret_volume) \
.add_volume_mount(krb_secret_volume_mount) \
.add_volume(eos_volume) \
.add_volume_mount(eos_volume_mount)
```
%% Cell type:markdown id: tags:
### Compile Pipeline
%% Cell type:code id: tags:
``` python
pipeline_name = 'example_kfp_pipeline_20'
pipeline_file = pipeline_name + '.yaml'
experiment_name = 'example_kfp_experiment'
client = kfp.Client()
workflow = kfp.compiler.Compiler().compile(ml_pipeline_first, pipeline_file)
```
%% Cell type:markdown id: tags:
### Function for Accessing EOS
%% Cell type:code id: tags:
``` python
def post_process(pipeline_file, outfile):
with open(pipeline_file, "r") as stream:
pip_dict = yaml.safe_load(stream)
copy_command = 'cp /secret/krb-secret-vol/krb5cc_1000 /tmp/krb5cc_1000'
chmod_command = 'chmod 600 /tmp/krb5cc_1000'
for template in pip_dict['spec']['templates']:
if 'container' in template.keys():
component_command_list = template['container']['command'][2].split('\n')
component_command_list.insert(2, copy_command)
component_command_list.insert(3, chmod_command)
# Check EOS access with this command
# component_command_list.insert(4, 'ls -l /eos/user/d/dgolubov')
joined_string = '\n'.join(component_command_list)
template['container']['command'][2] = joined_string
with open(outfile, 'w') as outfile:
yaml.dump(pip_dict, outfile, default_flow_style=False)
```
%% Cell type:markdown id: tags:
### Apply Access to EOS
%% Cell type:code id: tags:
``` python
post_process(pipeline_file, pipeline_file)
```
%% Cell type:markdown id: tags:
### Upload and Run Pipeline
%% Cell type:code id: tags:
``` python
client.upload_pipeline(pipeline_file, pipeline_name)
exp = client.create_experiment(name=experiment_name)
run = client.run_pipeline(exp.id, pipeline_name, pipeline_file)
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment