Skip to content
Snippets Groups Projects
Commit 249651aa authored by Dejan Golubovic's avatar Dejan Golubovic
Browse files

Delete incomplete examples

parent d08d6205
Branches
Tags
No related merge requests found
Showing
with 0 additions and 2308 deletions
## Example - open-vaccine
### What is it about?
Create and run a pipeline by annotating cells and using KALE Jupyter Lab extension.
Imported and adjusted from here: https://github.com/kubeflow-kale/kale/tree/master/examples/openvaccine-kaggle-competition
### How to run?
- Open **open-vaccine/open-vaccine.ipynb** in your Notebook server
- On the left side, select Kubeflow Pipelines Deployment Panel
- Toggle Enable
- Select Experiment (existing or new)
- Write Pipeline name and Pipeline description
- Untoggle *HP Tuning with Katib*
- Click Compile and Run at the bottom of the page
- After successfull compilation, click View
- Inspect and debug your pipeline via Pipeline log
This diff is collapsed.
pandas
requests
tensorflow==2.3.0
This diff is collapsed.
FROM gitlab-registry.cern.ch/cloud/ciadm:209156b743a4f9133b739b3d63130d8c332ceaa4
# Setup kerberos (if model is located on EOS)
COPY krb5.conf /etc/krb5.conf
# Clone KFP
WORKDIR /
RUN git clone -b cern/v1.4.1 https://gitlab.cern.ch/ai-ml/pipelines.git
# Install kfp-server-api
WORKDIR /pipelines/backend/api/python_http_client
RUN pip3 install --upgrade "enum34==1.1.8" && \
pip3 install -U . --upgrade
# Install kfp
WORKDIR /pipelines/sdk/python
RUN pip3 install -U . --upgrade
# Install kubernetes python client
RUN pip3 install kubernetes
# Copy template and serving script
RUN mkdir -p /ml
COPY serve_model.py /ml/serve_model.py
COPY inference_service_template.yaml /ml/inference_service_template.yaml
WORKDIR /ml
apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
name: MODEL_NAME_TO_REPLACE
namespace: NAMESPACE_TO_REPLACE
spec:
default:
predictor:
tensorflow:
storageUri: STORAGE_TO_REPLACE
; AD : This Kerberos configuration is for CERN's Active Directory realm.
;
; /etc/krb5.conf
[libdefaults]
default_realm = CERN.CH
ticket_lifetime = 25h
renew_lifetime = 120h
forwardable = true
proxiable = true
default_tkt_enctypes = arcfour-hmac-md5 aes256-cts aes128-cts des3-cbc-sha1 des-cbc-md5 des-cbc-crc
chpw_prompt = true
rdns = false
[appdefaults]
pam = {
external = true
krb4_convert = false
krb4_convert_524 = false
krb4_use_as_req = false
}
[domain_realm]
.cern.ch = CERN.CH
.fnal.gov = FNAL.GOV
.hep.man.ac.uk = HEP.MAN.AC.UK
.in2p3.fr = IN2P3.FR
# No default domain for KFKI.HU specified.
[realms]
# Start of puppet output for CERN.CH
CERN.CH = {
default_domain = cern.ch
kpasswd_server = cerndc.cern.ch
admin_server = cerndc.cern.ch
kdc = cerndc.cern.ch
v4_name_convert = {
host = {
rcmd = host
}
}
}
# Start of puppet output for FNAL.GOV
FNAL.GOV = {
default_domain = fnal.gov
admin_server = krb-fnal-admin.fnal.gov
kdc = krb-fnal-fcc3.fnal.gov:88
kdc = krb-fnal-2.fnal.gov:88
kdc = krb-fnal-3.fnal.gov:88
kdc = krb-fnal-1.fnal.gov:88
kdc = krb-fnal-4.fnal.gov:88
kdc = krb-fnal-enstore.fnal.gov:88
kdc = krb-fnal-fg2.fnal.gov:88
kdc = krb-fnal-cms188.fnal.gov:88
kdc = krb-fnal-cms204.fnal.gov:88
kdc = krb-fnal-d0online.fnal.gov:88
}
# Start of puppet output for HEP.MAN.AC.UK
HEP.MAN.AC.UK = {
default_domain = hep.man.ac.uk
kpasswd_server = afs4.hep.man.ac.uk
admin_server = afs4.hep.man.ac.uk
kdc = afs1.hep.man.ac.uk
kdc = afs2.hep.man.ac.uk
kdc = afs3.hep.man.ac.uk
kdc = afs4.hep.man.ac.uk
}
# Start of puppet output for IN2P3.FR
IN2P3.FR = {
default_domain = in2p3.fr
kpasswd_server = kerberos-admin.in2p3.fr
admin_server = kerberos-admin.in2p3.fr
kdc = kerberos-1.in2p3.fr
kdc = kerberos-2.in2p3.fr
kdc = kerberos-3.in2p3.fr
}
# Start of puppet output for KFKI.HU
KFKI.HU = {
admin_server = kerberos.kfki.hu
kdc = kerberos.kfki.hu
}
import argparse
import kfp
from kubernetes import config
import yaml
import kubernetes
def get_parser():
parser = argparse.ArgumentParser(description='Serving Params')
parser.add_argument('--model_name', type=str)
parser.add_argument('--model_path', type=str)
return parser
def edit_template(src, dst, args, ns):
with open(src, 'r') as f:
template = f.read()
template = template.replace('MODEL_NAME_TO_REPLACE', args.model_name)
template = template.replace('STORAGE_TO_REPLACE', args.model_path)
template = template.replace('NAMESPACE_TO_REPLACE', ns)
with open(dst, 'w') as f:
f.write(template)
def create_inferenceservice(client, yaml_filepath, ns):
CO_GROUP = "serving.kubeflow.org"
CO_VERSION = "v1alpha2"
CO_PLURAL = "inferenceservices"
API_VERSION = "%s/%s" % (CO_GROUP, CO_VERSION)
print('yaml_filepath')
print(yaml_filepath)
with open(yaml_filepath, 'r') as f:
infs_spec = yaml.load(f, Loader=yaml.FullLoader)
print('Template loaded for submission')
client.create_namespaced_custom_object(CO_GROUP, CO_VERSION,
ns, CO_PLURAL,
infs_spec)
parser = get_parser()
args = parser.parse_args()
namespace = kfp.Client().get_user_namespace()
print('args:')
print(args)
print('namespace:')
print(namespace)
edit_template(src='inference_service_template.yaml', \
dst='inference_service.yaml', \
args=args,
ns=namespace)
print('Template edited')
config.load_incluster_config()
print('Incluster config loaded')
k8s_co_client = kubernetes.client.CustomObjectsApi()
print('Client obtained')
create_inferenceservice(k8s_co_client, 'inference_service.yaml', namespace)
print('CRD created')
name: Serve TF model
description: |
A Kubeflow Pipeline component to deploy a tf-serving service
metadata:
labels:
add-pod-env: 'true'
inputs:
- name: model_name
type: String
- name: model_path
type: GCSPath
implementation:
container:
image: gitlab-registry.cern.ch/ai-ml/kubeflow_images/serving:8
command:
- python3
- /ml/serve_model.py
- --model_name
- inputValue: model_name
- --model_path
- inputValue: model_path
env:
KFP_POD_NAME: "{{pod.name}}"
FROM tensorflow/tensorflow:2.3.0-gpu
RUN pip install --upgrade pip
RUN pip install pathlib2
RUN pip install gsutil
ADD ml /ml
ENTRYPOINT ["python", "/ml/bikes_weather_limited.py"]
\ No newline at end of file
# training_component
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import json
import time
import os
import tensorflow as tf
import pathlib2
import bwmodel.model as bwmodel
DEVELOP_MODE = False
NBUCKETS = 5 # for embeddings
NUM_EXAMPLES = 1000*1000 * 20 # assume 20 million examples
STRATEGY = tf.distribute.MirroredStrategy()
TRAIN_BATCH_SIZE = 64 * STRATEGY.num_replicas_in_sync
def create_model(learning_rate, hidden_size, num_hidden_layers):
inputs, sparse, real = bwmodel.get_layers()
logging.info('sparse keys: %s', sparse.keys())
logging.info('real keys: %s', real.keys())
model = None
print('num replicas...')
print(STRATEGY.num_replicas_in_sync)
with STRATEGY.scope(): # hmmm
model = bwmodel.wide_and_deep_classifier(
inputs,
linear_feature_columns=sparse.values(),
dnn_feature_columns=real.values(),
num_hidden_layers=num_hidden_layers,
dnn_hidden_units1=hidden_size,
learning_rate=learning_rate)
model.summary()
return model
def main():
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser(description='ML Trainer')
parser.add_argument(
'--epochs', type=int, default=1)
parser.add_argument(
# e.g. {"num_hidden_layers": 3, "hidden_size": 96, "learning_rate": 0.01}
'--hptune-results', required=True)
parser.add_argument(
'--steps-per-epoch', type=int,
default=-1) # if set to -1, don't override the normal calcs for this
parser.add_argument(
'--hp-idx', type=int,
default=0)
parser.add_argument(
'--workdir', required=True)
parser.add_argument(
'--tb-dir', required=True)
parser.add_argument(
'--data-dir', default='gs://aju-dev-demos-codelabs/bikes_weather/')
parser.add_argument(
'--train-output-path', required=True)
parser.add_argument(
'--metrics-output-path', required=True)
args = parser.parse_args()
logging.info('Tensorflow version %s', tf.__version__)
logging.info('got hptune results: %s', args.hptune_results)
hptune_info = json.loads(str(args.hptune_results))
logging.info('hptune_info: %s', hptune_info)
# extract hptuning best params results
learning_rate = hptune_info[args.hp_idx]['learning_rate']
hidden_size = hptune_info[args.hp_idx]['hidden_size']
num_hidden_layers = hptune_info[args.hp_idx]['num_hidden_layers']
logging.info('using: learning rate %s, hidden size %s, first hidden layer %s',
learning_rate, hidden_size, num_hidden_layers)
TRAIN_DATA_PATTERN = args.data_dir + "train*"
EVAL_DATA_PATTERN = args.data_dir + "test*"
OUTPUT_DIR = '{}/bwmodel/trained_model'.format(args.workdir)
logging.info('Writing trained model to %s', OUTPUT_DIR)
train_batch_size = TRAIN_BATCH_SIZE
eval_batch_size = 1000
if args.steps_per_epoch == -1: # calc based on dataset size
steps_per_epoch = NUM_EXAMPLES // train_batch_size
else:
steps_per_epoch = args.steps_per_epoch
logging.info('using %s steps per epoch', steps_per_epoch)
train_dataset = bwmodel.read_dataset(TRAIN_DATA_PATTERN, train_batch_size)
eval_dataset = bwmodel.read_dataset(EVAL_DATA_PATTERN, eval_batch_size,
tf.estimator.ModeKeys.EVAL, eval_batch_size * 100 * STRATEGY.num_replicas_in_sync
)
# Create metadata.json file for Tensorboard 'artifact'
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': args.tb_dir
}]
}
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
model = create_model(learning_rate, hidden_size, num_hidden_layers)
checkpoint_path = '{}/checkpoints/bikes_weather.cpt'.format(OUTPUT_DIR)
logging.info("checkpoint path: %s", checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path,
save_weights_only=True,
verbose=1)
tb_callback = tf.keras.callbacks.TensorBoard(log_dir='{}/logs'.format(OUTPUT_DIR),
update_freq=20000)
logging.info("training model....")
history = model.fit(train_dataset,
validation_data=eval_dataset,
validation_steps=eval_batch_size,
epochs=args.epochs,
steps_per_epoch=steps_per_epoch,
callbacks=[cp_callback, tb_callback]
)
logging.info(history.history.keys())
# write metrics info dict
metrics_json = json.dumps(history.history)
print('metrics json: {}'.format(metrics_json))
try:
pathlib2.Path(args.metrics_output_path).parent.mkdir(parents=True)
except FileExistsError as e1:
logging.info(e1)
pathlib2.Path(args.metrics_output_path).write_text(metrics_json)
ts = str(int(time.time()))
export_dir = '{}/export/bikesw/{}'.format(OUTPUT_DIR, ts)
logging.info('Exporting to %s', export_dir)
try:
pathlib2.Path(args.train_output_path).parent.mkdir(parents=True)
except FileExistsError as e2:
logging.info(e2)
try:
logging.info("exporting model....")
tf.saved_model.save(model, '/tmp/exported_model')
gsutil_cp = 'gsutil cp /tmp/exported_model ' + export_path
os.system(gsutil_cp)
logging.info("train_output_path: %s", args.train_output_path)
export_path = '{}/export/bikesw'.format(OUTPUT_DIR)
logging.info('export path: %s', export_path)
pathlib2.Path(args.train_output_path).write_text(export_path)
except Exception as e: # retry once if error
logging.warning(e)
logging.info("retrying...")
time.sleep(10)
logging.info("again ... exporting model....")
tf.saved_model.save(model, '/tmp/exported_model')
gsutil_cp = 'gsutil cp /tmp/exported_model ' + export_path
os.system(gsutil_cp)
export_path = '{}/export/bikesw'.format(OUTPUT_DIR)
pathlib2.Path(args.train_output_path).write_text(export_path)
if __name__ == "__main__":
main()
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import json
import time
import tensorflow as tf
from google.cloud import storage
from kerastuner.tuners import RandomSearch # , Hyperband
import bwmodel.model as bwmodel
DEVELOP_MODE = False
NBUCKETS = 5 # for embeddings
NUM_EXAMPLES = 1000*1000 * 20 # assume 20 million examples
STRATEGY = tf.distribute.MirroredStrategy()
TRAIN_BATCH_SIZE = 256 * STRATEGY.num_replicas_in_sync
def create_model(hp):
inputs, sparse, real = bwmodel.get_layers()
logging.info('sparse keys: %s', sparse.keys())
logging.info('real keys: %s', real.keys())
model = None
print('num replicas...')
print(STRATEGY.num_replicas_in_sync)
model = bwmodel.wide_and_deep_classifier(
inputs,
linear_feature_columns=sparse.values(),
dnn_feature_columns=real.values(),
num_hidden_layers=hp.Int('num_hidden_layers', 2, 5),
dnn_hidden_units1=hp.Int('hidden_size', 16, 256, step=32),
learning_rate=hp.Choice('learning_rate',
values=[5e-1, 1e-1, 1e-2, 1e-3, 1e-4])
)
model.summary()
return model
def main():
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser(description='Keras Tuner HP search')
parser.add_argument(
'--epochs', type=int,
default=1)
parser.add_argument(
'--steps-per-epoch', type=int,
default=-1) # if set to -1, don't override the normal calcs for this
parser.add_argument(
'--tuner-proj',
required=True)
parser.add_argument(
'--bucket-name', required=True)
parser.add_argument(
'--tuner-dir',
required=True)
parser.add_argument(
'--tuner-num',
required=True)
parser.add_argument(
'--respath',
required=True)
parser.add_argument(
'--executions-per-trial', type=int,
default=2)
parser.add_argument(
'--max-trials', type=int,
default=20)
parser.add_argument(
'--num-best-hps', type=int,
default=2)
parser.add_argument(
'--data-dir',
default='gs://aju-dev-demos-codelabs/bikes_weather/')
args = parser.parse_args()
logging.info('Tensorflow version %s', tf.__version__)
TRAIN_DATA_PATTERN = args.data_dir + "train*"
EVAL_DATA_PATTERN = args.data_dir + "test*"
train_batch_size = TRAIN_BATCH_SIZE
eval_batch_size = 1000
if args.steps_per_epoch == -1: # calc based on dataset size
steps_per_epoch = NUM_EXAMPLES // train_batch_size
else:
steps_per_epoch = args.steps_per_epoch
logging.info('using %s steps per epoch', steps_per_epoch)
logging.info('using train batch size %s', train_batch_size)
train_dataset = bwmodel.read_dataset(TRAIN_DATA_PATTERN, train_batch_size)
eval_dataset = bwmodel.read_dataset(EVAL_DATA_PATTERN, eval_batch_size,
tf.estimator.ModeKeys.EVAL,
eval_batch_size * 100 * STRATEGY.num_replicas_in_sync
)
logging.info('executions per trial: %s', args.executions_per_trial)
# TODO: parameterize
retries = 0
num_retries = 5
sleep_time = 5
while retries < num_retries:
try:
tuner = RandomSearch(
# tuner = Hyperband(
create_model,
objective='val_mae',
# max_epochs=10,
# hyperband_iterations=2,
max_trials=args.max_trials,
distribution_strategy=STRATEGY,
executions_per_trial=args.executions_per_trial,
directory=args.tuner_dir,
project_name=args.tuner_proj
)
break
except Exception as e:
logging.warning(e)
logging.info('sleeping %s seconds...', sleep_time)
time.sleep(sleep_time)
retries += 1
sleep_time *= 2
logging.info("search space summary:")
logging.info(tuner.search_space_summary())
logging.info("hp tuning model....")
tuner.search(train_dataset,
validation_data=eval_dataset,
validation_steps=eval_batch_size,
epochs=args.epochs,
steps_per_epoch=steps_per_epoch,
)
best_hps = tuner.get_best_hyperparameters(args.num_best_hps)
best_hps_list = [best_hps[i].values for i in range(args.num_best_hps)]
logging.info('best_hps_list: %s', best_hps_list)
best_hp_values = json.dumps(best_hps_list)
logging.info('best hyperparameters: %s', best_hp_values)
storage_client = storage.Client()
logging.info('writing best results to %s', args.respath)
bucket = storage_client.get_bucket(args.bucket_name)
logging.info('using bucket %s: %s, path %s', args.bucket_name, bucket, args.respath)
blob = bucket.blob(args.respath)
blob.upload_from_string(best_hp_values)
# uncomment to also save best model from hp search
# OUTPUT_DIR = '{}/{}/{}/bwmodel/trained_model'.format(args.tuner_dir,
# args.tuner_proj, args.tuner_num)
# logging.info('Writing trained model to %s', OUTPUT_DIR)
# best_model = tuner.get_best_models(1)[0]
# logging.info('best model: %s', best_model)
# ts = str(int(time.time()))
# export_dir = '{}/export/bikesw/{}'.format(OUTPUT_DIR, ts)
# logging.info('Exporting to %s', export_dir)
# try:
# logging.info("exporting model....")
# tf.saved_model.save(best_model, export_dir)
# except Exception as e: # retry once if error
# logging.warning(e)
# logging.info("retrying...")
# time.sleep(10)
# logging.info("again ... exporting model....")
# tf.saved_model.save(best_model, export_dir)
if __name__ == "__main__":
main()
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Adapted in part from:
# https://github.com/GoogleCloudPlatform/data-science-on-gcp/blob/master/09_cloudml/flights_model_tf2.ipynb
# by Valliappa Lakshmanan. (See that repo for more info about the accompanying book,
# "Data Science on the Google Cloud Platform", from O'Reilly.)
import tensorflow as tf
CSV_COLUMNS = ('duration,end_station_id,bike_id,ts,day_of_week,start_station_id' +
',start_latitude,start_longitude,end_latitude,end_longitude' +
',euclidean,loc_cross,prcp,max,min,temp,dewp').split(',')
LABEL_COLUMN = 'duration'
DEFAULTS = [[0.0], ['na'], ['na'], [0.0], ['na'], ['na'],
[0.0], [0.0], [0.0], [0.0],
[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0]]
def load_dataset(pattern, batch_size=1):
return tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)
def features_and_labels(features):
label = features.pop('duration') # this is what we will train for
features.pop('bike_id')
return features, label
def read_dataset(pattern, batch_size, mode=tf.estimator.ModeKeys.TRAIN, truncate=None):
dataset = load_dataset(pattern, batch_size)
dataset = dataset.map(features_and_labels, num_parallel_calls=tf.data.experimental.AUTOTUNE)
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.repeat().shuffle(batch_size*10)
# dataset = dataset.repeat()
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
# dataset = dataset.prefetch(1)
if truncate is not None:
dataset = dataset.take(truncate)
return dataset
def get_layers():
# duration,end_station_id,bike_id,ts,day_of_week,start_station_id,start_latitude,start_longitude,end_latitude,end_longitude,
# euclidean,loc_cross,prcp,max,min,temp,dewp
real = {
colname : tf.feature_column.numeric_column(colname)
for colname in
# ('ts,start_latitude,start_longitude,end_latitude,end_longitude,euclidean,prcp,max,min,temp,dewp').split(',')
# ('ts,euclidean,prcp,max,min,temp,dewp').split(',')
('euclidean,prcp,max,min,temp,dewp').split(',')
}
sparse = {
'day_of_week': tf.feature_column.categorical_column_with_vocabulary_list('day_of_week',
vocabulary_list='1,2,3,4,5,6,7'.split(',')),
'end_station_id' : tf.feature_column.categorical_column_with_hash_bucket(
'end_station_id', hash_bucket_size=800),
'start_station_id' : tf.feature_column.categorical_column_with_hash_bucket(
'start_station_id', hash_bucket_size=800),
'loc_cross' : tf.feature_column.categorical_column_with_hash_bucket(
'loc_cross', hash_bucket_size=21000),
# 'bike_id' : tf.feature_column.categorical_column_with_hash_bucket('bike_id', hash_bucket_size=14000)
}
inputs = {
colname : tf.keras.layers.Input(name=colname, shape=(), dtype='float32')
for colname in real.keys()
}
inputs.update({'ts': tf.keras.layers.Input(name='ts', shape=(), dtype='float64')})
inputs.update({
colname : tf.keras.layers.Input(name=colname, shape=(), dtype='string')
for colname in sparse.keys()
})
# embed all the sparse columns
embed = {
'embed_{}'.format(colname) : tf.feature_column.embedding_column(col, 10)
for colname, col in sparse.items()
}
real.update(embed)
# one-hot encode the sparse columns
sparse = {
colname : tf.feature_column.indicator_column(col)
for colname, col in sparse.items()
}
return inputs, sparse, real
# Build a wide-and-deep model.
def wide_and_deep_classifier(inputs, linear_feature_columns, dnn_feature_columns,
num_hidden_layers, dnn_hidden_units1, learning_rate):
deep = tf.keras.layers.DenseFeatures(dnn_feature_columns, name='deep_inputs')(inputs)
layers = [dnn_hidden_units1]
if num_hidden_layers > 1:
layers += [int(dnn_hidden_units1/(x*2)) for x in range(1, num_hidden_layers)]
for layerno, numnodes in enumerate(layers):
deep = tf.keras.layers.Dense(numnodes, activation='relu', name='dnn_{}'.format(layerno+1))(deep)
wide = tf.keras.layers.DenseFeatures(linear_feature_columns, name='wide_inputs')(inputs)
both = tf.keras.layers.concatenate([deep, wide], name='both')
output = tf.keras.layers.Dense(1, name='dur')(both)
model = tf.keras.Model(inputs, output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate)
model.compile(loss='mse', optimizer=optimizer,
metrics=['mse', 'mae', tf.keras.metrics.RootMeanSquaredError()])
return model
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import os
import subprocess
import time
from google.cloud import storage
OUTPUT_PATH = '/tmp/hps.json'
def main():
parser = argparse.ArgumentParser(description='Keras distributed tuner')
parser.add_argument(
'--epochs', type=int, required=True)
parser.add_argument(
'--num-tuners', type=int, required=True)
parser.add_argument(
'--bucket-name', required=True)
parser.add_argument(
'--tuner-dir', required=True)
parser.add_argument(
'--tuner-proj', required=True)
parser.add_argument(
'--max-trials', type=int, required=True)
parser.add_argument(
'--namespace', default='default')
parser.add_argument(
'--executions-per-trial', type=int,
default=2)
parser.add_argument(
'--num-best-hps', type=int,
default=2)
parser.add_argument('--deploy', default=False, action='store_true')
parser.add_argument('--no-deploy', dest='deploy', action='store_false')
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
tuner_path = 'gs://{}/{}'.format(args.bucket_name, args.tuner_dir)
res_path = '{}/{}/{}'.format(args.bucket_name, args.tuner_dir, 'bp.txt')
logging.info('tuner path: %s, res path %s', tuner_path, res_path)
logging.info('Generating tuner deployment templates.')
ts = int(time.time())
KTUNER_CHIEF = 'ktuner{}-chief'.format(ts)
logging.info('KTUNER_CHIEF: %s', KTUNER_CHIEF)
KTUNER_DEP_PREFIX = 'ktuner{}-dep'.format(ts)
logging.info('KTUNER_DEP_PREFIX: %s', KTUNER_DEP_PREFIX)
template_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'kchief_deployment_templ.yaml')
chief_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'kchief_dep.yaml')
with open(template_file, 'r') as f:
with open(chief_file_path, "w") as target:
data = f.read()
changed = data.replace('EPOCHS', str(args.epochs)).replace(
'TUNER_DIR', tuner_path).replace('NAMESPACE', args.namespace).replace(
'TUNER_PROJ', args.tuner_proj).replace('TUNER_NUM', 'chief').replace(
'MAX_TRIALS', str(args.max_trials)).replace(
'KTUNER_CHIEF', KTUNER_CHIEF).replace('RES_PATH', res_path).replace(
'BUCKET_NAME', args.bucket_name).replace('NUM_BEST_HPS', str(args.num_best_hps)).replace(
'EXECS_PER_TRIAL', str(args.executions_per_trial))
target.write(changed)
tuner_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'ktuners_dep.yaml')
logging.info("tuner file path: %s", tuner_file_path)
if os.path.exists(tuner_file_path):
os.remove(tuner_file_path)
template_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'ktuners_deployment_templ.yaml')
logging.info("num tuners: %s", args.num_tuners)
with open(template_file, 'r') as f:
with open(tuner_file_path, "a") as target:
data = f.read()
for i in range(args.num_tuners):
changed = data.replace('EPOCHS', str(args.epochs)).replace(
'TUNER_DIR', tuner_path).replace('NAMESPACE', args.namespace).replace(
'TUNER_PROJ', args.tuner_proj).replace('TUNER_NUM', str(i)).replace(
'KTUNER_CHIEF', KTUNER_CHIEF).replace(
'MAX_TRIALS', str(args.max_trials)).replace('RES_PATH', res_path).replace(
'BUCKET_NAME', args.bucket_name).replace(
'NUM_BEST_HPS', str(args.num_best_hps)).replace(
'EXECS_PER_TRIAL', str(args.executions_per_trial))
changed = changed.replace(
'KTUNER_DEP_NAME', KTUNER_DEP_PREFIX +'{}'.format(i)).replace(
'KTUNER_ID', 'tuner{}'.format(i))
target.write(changed)
if args.deploy:
logging.info('deploying chief...')
subprocess.run(['kubectl', 'apply', '-f', chief_file_path])
logging.info('pausing before tuner worker deployment...')
time.sleep(120)
logging.info('deploying tuners...')
subprocess.run(['kubectl', 'apply', '-f', tuner_file_path])
logging.info('finished deployments.')
# wait for the tuner pods to be ready... if we're autoscaling the GPU pool,
# this might take a while.
for i in range(args.num_tuners):
logging.info('waiting for tuner %s pod to be ready...', i)
subprocess.run(['kubectl', '-n={}'.format(args.namespace), 'wait', 'pod',
'--for=condition=ready', '--timeout=15m',
'-l=job-name={}{}'.format(KTUNER_DEP_PREFIX, i)])
# wait for all the tuner workers to complete
for i in range(args.num_tuners):
logging.info('waiting for completion of tuner %s...', i)
# negative timeout value --> one week
sp_res = subprocess.run(['kubectl', '-n={}'.format(args.namespace), 'wait',
'--for=condition=complete', '--timeout=-1m', 'job/{}{}'.format(KTUNER_DEP_PREFIX, i)],
capture_output=True)
# In some cases the k8s api seems to be temporarily unavailable, which causes the
# 'wait' to terminate prematurely. TODO: What's the best way to address this?
if 'The connection to the server' in str(sp_res.stdout):
# then try again after a pause
logging.info('got connection error; sleeping for 20s')
time.sleep(20)
logging.info('waiting again for completion of tuner %s...', i)
sp_res = subprocess.run(['kubectl', '-n={}'.format(args.namespace), 'wait',
'--for=condition=complete', '--timeout=-1m', 'job/{}{}'.format(KTUNER_DEP_PREFIX, i)],
capture_output=True)
# get info on the best params once search has completed
client = storage.Client()
bucket = client.get_bucket(args.bucket_name)
logging.info('using bucket %s: %s, path %s', args.bucket_name, bucket, res_path)
blob = bucket.get_blob(res_path)
results_string = blob.download_as_string()
logging.info('got results info: %s', results_string)
rs = results_string.decode("utf-8")
logging.info('rs: %s', rs)
with open(OUTPUT_PATH, 'w') as f:
f.write(rs)
if __name__ == "__main__":
main()
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import NamedTuple
# from kfp.components import InputPath, OutputPath
# An example of how the model eval info could be used to make decisions about whether or not
# to deploy the model.
def eval_metrics(
metrics: str,
thresholds: str
) -> NamedTuple('Outputs', [('deploy', str)]):
import json
import logging
def regression_threshold_check(metrics_info):
# ...
for k, v in thresholds_dict.items():
logging.info('k {}, v {}'.format(k, v))
if k in ['root_mean_squared_error', 'mae']:
if metrics_info[k][-1] > v:
logging.info('{} > {}; returning False'.format(metrics_info[k][0], v))
return ('False', )
return ('deploy', )
logging.getLogger().setLevel(logging.INFO) # TODO: make level configurable
thresholds_dict = json.loads(thresholds)
logging.info('thresholds dict: {}'.format(thresholds_dict))
logging.info('metrics: %s', metrics)
metrics_dict = json.loads(metrics)
logging.info("got metrics info: %s", metrics_dict)
res = regression_threshold_check(metrics_dict)
logging.info('deploy decision: %s', res)
return res
if __name__ == '__main__':
import kfp
kfp.components.func_to_container_op(eval_metrics,
output_component_file='../../eval_metrics_component.yaml', base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment