Skip to content
Snippets Groups Projects

automatic database register from google sheet data

Merged Zhicai Zhang requested to merge zhicaiz/reporting:zz_autoDB into master
Compare and
12 files
+ 531
117
Compare changes
  • Side-by-side
  • Inline
Files
12
+ 262
0
#!/usr/bin/env python
import os, sys, time
import argparse
import itkdb
from database import pwbdb
from database import pwbdbtools
from database import transitions
from database import pbv3_panel_smdload
from database import pbv3_panel_transition
from database import pbv3_panel_loadcoils
from database import pbv3_panel_loadshieldbox
from database import pbv3_panel_singulate
from database import pbv3_panel_loadhvmux
from database import pbv3_panel_loadamac
import gspread
"""
Google sheet API has a quota limit of 60 reads and 60 writes per mininute per user per project. To not exceed that limit, sleep 1 second after every read/write access
"""
def updateSheet(sheet, irow, panel, panelid, client, checkTests = False):
cell_ready = sheet.find("Ready for DB register?")
time.sleep(1)
cell_currentStage = sheet.find("Stage in DB")
time.sleep(1)
cell_currentLocation = sheet.find("Current Location")
time.sleep(1)
cell_shipmentDestination = sheet.find("Shipment destination")
time.sleep(1)
cell_passTests = sheet.find("#PBs pass all tests")
time.sleep(1)
cell_dbLink = sheet.find("DB link")
time.sleep(1)
if cell_ready is None or cell_currentStage is None or cell_currentLocation is None or cell_shipmentDestination is None or cell_passTests is None or cell_dbLink is None:
return
dbLink = "https://itkpd-test.unicorncollege.cz/componentView?code="+panel['code']
currentLocation = panel['currentLocation']['name']
currentStage = panel['currentStage']['name']
shipmentDestination = ""
if panel['shipmentDestination'] is not None:
shipmentDestination = panel['shipmentDestination']['name']
componentType = panel['componentType']['code']
print("Current location: " + currentLocation)
print("Shipment destination: " + shipmentDestination)
print("Current stage: " + currentStage)
print("Link to idkpd page: " + dbLink)
if currentLocation != sheet.cell(irow, cell_currentLocation.col).value:
sheet.update_cell(irow, cell_currentLocation.col, currentLocation)
time.sleep(2)
if currentStage != sheet.cell(irow, cell_currentStage.col).value:
sheet.update_cell(irow, cell_currentStage.col, currentStage)
time.sleep(2)
if shipmentDestination != sheet.cell(irow, cell_shipmentDestination.col).value:
sheet.update_cell(irow, cell_shipmentDestination.col, shipmentDestination)
time.sleep(2)
if dbLink != sheet.cell(irow, cell_dbLink.col).value:
sheet.update_cell(irow, cell_dbLink.col, dbLink)
time.sleep(2)
# test check
if checkTests:
panel_transition = transitions.Panel(panelid, client)
transition = transitions.available[panel['currentStage']['code']]
pass_test = transition.check_tests(panel_transition, ignore_tests=["STATUS", "PICTURE", "HV_ENABLE", "VISUAL_INSPECTION", "HVSENSE", "DCDCEFFICIENCY"])
print("#PB Pass tests: "+str(pass_test))
if pass_test != sheet.cell(irow, cell_passTests.col).value:
sheet.update_cell(irow, cell_passTests.col, pass_test)
time.sleep(2)
def updatePanel(sheet, irow, client, checkTests = False, flex_array_batch="2021-12-01", carrier_card_batch = "20220112", bpol_type='BPOL12V4'):
panelid = sheet.cell(irow, 1).value
time.sleep(1)
#panel = transitions.Panel(panelid, client)
print("Panle ID: "+panelid)
panel = None
try:
panel = client.get('getComponent', json={'component':panelid, 'alternativeIdentifier':True})
except Exception:
print('Carrier card of ' + panelid +' not found in DB..')
if panel is None:
# check if flex array exists:
flexArray = None
try:
flexArray = client.get('getComponent', json={'component':'20USBPF'+panelid})
except Exception:
print('Flex array of '+panelid+' not found in DB')
if flexArray is None:
print('Register powerboard flex array for '+panelid+" now...")
pwbdbtools.registerFlexArray(client, panelid, flex_array_batch)
flexArray = client.get('getComponent', json={'component':'20USBPF'+panelid})
print('Found flex array: '+flexArray['code'])
currentStage = flexArray['currentStage']['code']
currentLocation = flexArray['currentLocation']['code']
print("Current stage: " + currentStage)
if currentStage == 'RCP':
print('====== Next step: transition to SMD_LOAD stage...')
pbv3_panel_transition.panel_transition(client, panelid, 'SMD_LOAD', force=True)
flexArray = client.get('getComponent', json={'component':'20USBPF'+panelid})
currentStage = flexArray['currentStage']['code']
updateSheet(sheet, irow, flexArray, panelid, client, False)
if currentStage == 'SMD_LOAD':
print('====== Next step: load SMD components (linPOL, bPOL)...')
pbv3_panel_smdload.loop_smdload(panelid, client, bpol_type=bpol_type, linpol_type='LINPOL12V', currentLocation = currentLocation)
print('====== Next step: transition to MAN_LOAD stage...')
pbv3_panel_transition.panel_transition(client, panelid, 'MAN_LOAD', force=True)
flexArray = client.get('getComponent', json={'component':'20USBPF'+panelid})
currentStage = flexArray['currentStage']['code']
updateSheet(sheet, irow, flexArray, panelid, client, False)
if currentStage == 'MAN_LOAD':
print('====== Next step: load coils...')
pbv3_panel_loadcoils.soldercoils(client, panelid, currentLocation)
print('====== Next step: load shield box...')
sb_version = sheet.cell(irow, 2).value
time.sleep(1)
sb_batch = sheet.cell(irow, 3).value
time.sleep(1)
if (sb_version is not None) and (sb_batch is not None):
for icol in range(4, 14):
pb_num = sheet.cell(irow, icol).value
time.sleep(1)
if pb_num is not None:
pbv3_panel_loadshieldbox.loadshieldbox(client, panelid, icol-4, int(pb_num), sb_version = int(sb_version), sb_batch = int(sb_batch))
print('====== Next step: singulate flex array and bond powerboard to carrier card...')
pbv3_panel_singulate.singulate(client, panelid, carrier_card_batch)
flexArray = client.get('getComponent', json={'component':'20USBPF'+panelid})
currentStage = flexArray['currentStage']['code']
updateSheet(sheet, irow, flexArray, panelid, client, False)
# now get panel again:
try:
panel = client.get('getComponent', json={'component':panelid, 'alternativeIdentifier':True})
updateSheet(sheet, irow, panel, panelid, client, False)
except Exception:
print('Carrier card of ' + panelid +' not found in DB..')
if panel is not None:
print('Panel is on carrier card '+panel['code'])
currentStage = panel['currentStage']['code']
currentLocation = panel['currentLocation']['code']
if currentStage == 'MAN_LOAD':
print('====== Next step: transition to BONDED stage...')
pbv3_panel_transition.panel_transition(client, panelid, 'BONDED', force=True)
print('====== Next step: load hv mux...')
pbv3_panel_loadhvmux.loadhvmux(client, panelid, currentLocation)
print('====== Next step: load amac...')
cell_wafer = sheet.find("AMAC wafer number")
time.sleep(1)
icol_wafer = cell_wafer.col
wafer = sheet.cell(irow, icol_wafer).value
time.sleep(1)
for icol in range(icol_wafer+1, icol_wafer+11):
die_num_s = str(sheet.cell(irow, icol).value)
time.sleep(1)
die_num = 0
amac_sn = ''
if "20US" in die_num_s:
amac_sn = die_num_s
elif die_num_s[0].isdigit():
die_num = int(die_num_s)
else:
die_num = int(die_num_s[1:])
try:
pbv3_panel_loadamac.loadwafer(client, panelid, icol-icol_wafer-1, wafer, die_num, amac_sn)
except Exception:
print("Ooooops.... Failed to load amac "+str(die_num))
panel = client.get('getComponent', json={'component':panelid, 'alternativeIdentifier':True})
currentStage = panel['currentStage']['code']
if currentStage == "LOADED" or currentStage == "HYBBURN": # PB is no longer on panel anymore
updateSheet(sheet, irow, panel, panelid, client, checkTests=False)
else:
updateSheet(sheet, irow, panel, panelid, client, checkTests=checkTests)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Auto register powerboards from google sheet.")
parser.add_argument("-b","--bpol" , default='BPOL12V4', help="Assemble bPOL12V of given type.")
parser.add_argument("-c","--carrier" , default='20220112', help="powerboard carrier card batch.")
parser.add_argument("-f","--flex" , default='2021-12-01', help="powerboard flex array batch.")
parser.add_argument("-t","--waittime" , default=10, help="Time (minutes) to wait to repeat the scan of the whole google sheet")
parser.add_argument("-k","--key" , default='1GjACRhuTt9wX9Ld3mPJSpS3wtO5JtTXrY_0NnrtGnJQ', help="Google sheet file key string")
parser.add_argument("code1" , help="Access Code 1.")
parser.add_argument("code2" , help="Access Code 2.")
args = parser.parse_args()
user = itkdb.core.User(args.code1, args.code2)
user.authenticate()
if user.is_authenticated():
print('itkdb login successful!')
else:
print('Login unsuccessful...')
sys.exit(1)
print('itkdb authenticate will expire in : '+str(user.expires_in) + ' s')
client=itkdb.Client(user=user)
gc = gspread.service_account(filename='googleToken.json')
'''
# block of code to test single panel
gsheet = gc.open_by_key(args.key)
time.sleep(1)
sheets = gsheet.worksheets()
time.sleep(1)
sheet=sheets[4]
updatePanel(sheet, 6, client, checkTests=True, flex_array_batch=args.flex, carrier_card_batch=args.carrier, bpol_type=args.bpol)
'''
while True:
time_start = time.time()
try:
gsheet = gc.open_by_key(args.key)
time.sleep(1)
sheets = gsheet.worksheets()
time.sleep(1)
for isheet in range(len(sheets)):
sheet = sheets[isheet]
time.sleep(1)
values_all = sheet.get_all_values()
time.sleep(1)
cell_ready = sheet.find("Ready for DB register?")
time.sleep(1)
if cell_ready is not None:
for irow in range(2, len(values_all)):
if values_all[irow][cell_ready.col-1] == "Yes":
# check if login is expired
print('idkdb authenticate will expire in : '+str(user.expires_in) + ' s')
if user.is_expired() or user.expires_in < 300:
print('itkdb login: less than 5 mins left before expires => get new authentication now.')
user = itkdb.core.User(args.code1, args.code2)
user.authenticate()
client=itkdb.Client(user=user)
try:
updatePanel(sheet, irow+1, client, checkTests=True, flex_array_batch=args.flex, carrier_card_batch=args.carrier, bpol_type=args.bpol)
except Exception:
print("Ooooops.... Failed to update panel "+str(sheet.cell(irow, 1).value))
time.sleep(1)
except Exception:
print("Something unexpected happened.... will continue... ")
time_end = time.time()
if time_end - time_start < 60*int(args.waittime):
sleep_time = int(60*int(args.waittime) - time_end + time_start)
print("Sleep for " + str(sleep_time) + " seconds before doing a new scan of the google sheet")
time.sleep(sleep_time)
Loading