Commit 3a3adac6 authored by Alejandro Alvarez Ayllon's avatar Alejandro Alvarez Ayllon
Browse files

FTS-31: Use random numbers for the 'hashed_id'

As FTS3. Also, test added to check for uniformity of these ids.
parent a868a697
......@@ -37,6 +37,7 @@ BuildRequires: python26-devel
%else
BuildRequires: python-devel
%endif
BuildRequires: scipy
Requires: gridsite%{?_isa} >= 1.7
Requires: httpd%{?_isa}
......
......@@ -9,8 +9,8 @@ from fts3rest.lib.middleware.fts3auth import authorize, authorized
from fts3rest.lib.middleware.fts3auth.constants import *
from pylons import request
from pylons.controllers.util import abort
import hashlib
import json
import random
import re
import socket
import types
......@@ -32,14 +32,6 @@ DEFAULT_PARAMS = {
'retry' : 0
}
def _hashed_id(id):
assert id is not None
digest = hashlib.md5(str(id)).digest()
b16digest = digest.encode('hex')
return int(b16digest[:4], 16)
def _set_job_source_and_destination(job):
"""
Iterates through the files that belong to the job, and determines the
......@@ -105,7 +97,7 @@ def _validate_url(url):
raise ValueError('Missing host (%s)' % url)
def _populate_files(files_dict, findex):
def _populate_files(files_dict, findex, vo_name, shared_hashed_id = None):
"""
From the dictionary files_dict, generate a list of transfers for a job
"""
......@@ -132,6 +124,7 @@ def _populate_files(files_dict, findex):
file.dest_surl = d
file.source_se = _get_storage_element(s)
file.dest_se = _get_storage_element(d)
file.vo_name = vo_name
file.user_filesize = files_dict.get('filesize', None)
if file.user_filesize is None:
......@@ -142,6 +135,11 @@ def _populate_files(files_dict, findex):
file.file_metadata = files_dict.get('metadata', None)
file.activity = files_dict.get('activity', None)
if shared_hashed_id:
file.hashed_id = shared_hashed_id
else:
file.hashed_id = random.randint(0, 2**16 - 1)
files.append(file)
return files
......@@ -197,10 +195,16 @@ def _setup_job_from_dict(job_dict, user):
job.job_metadata = params['job_metadata']
job.job_params = str()
# If reuse is enabled, generate one single "hash" for all files
if job.reuse_job:
shared_hashed_id = random.randint(0, 2**16 - 1)
else:
shared_hashed_id = None
# Files
findex = 0
for t in job_dict['files']:
job.files.extend(_populate_files(t, findex))
job.files.extend(_populate_files(t, findex, job.vo_name, shared_hashed_id))
findex += 1
if len(job.files) == 0:
......@@ -405,26 +409,15 @@ class JobsController(BaseController):
abort(400,
'Can not specify reuse and multiple replicas at the same time')
# Update the optimizer
for file in job.files:
optimizer_active = OptimizerActive()
optimizer_active.source_se = file.source_se
optimizer_active.dest_se = file.dest_se
Session.merge(optimizer_active)
# Update the database
Session.merge(job)
Session.flush()
# Update hashed_id and vo_name, while updating OptimizerActive
# Mind that, for reuse jobs, we hash the job_id!
hashed_job_id = _hashed_id(job.job_id)
for file in Session.query(File).filter(File.job_id == job.job_id):
if job.reuse_job:
file.hashed_id = hashed_job_id
else:
file.hashed_id = _hashed_id(file.file_id)
file.vo_name = job.vo_name
Session.merge(file)
optimizer_active = OptimizerActive()
optimizer_active.source_se = file.source_se
optimizer_active.dest_se = file.dest_se
Session.merge(optimizer_active)
# Commit and return
Session.commit()
return job
......@@ -3,6 +3,7 @@ from fts3rest.lib.base import Session
from fts3.model import Job, File, OptimizerActive
import hashlib
import json
import scipy.stats
class TestJobs(TestController):
......@@ -11,12 +12,6 @@ class TestJobs(TestController):
The focus is in submissions, since it is the one that modifies the database
"""
def _hashedId(self, id):
digest = hashlib.md5(str(id)).digest()
b16digest = digest.encode('hex')
return int(b16digest[:4], 16)
def _validateSubmitted(self, job, noVo=False):
self.assertNotEqual(job, None)
files = job.files
......@@ -51,7 +46,6 @@ class TestJobs(TestController):
else:
self.assertEqual(files[0].vo_name, 'testvo')
self.assertEquals(self._hashedId(files[0].file_id), files[0].hashed_id)
self.assertEquals(files[0].activity, 'default')
# Validate optimizer too
......@@ -479,3 +473,37 @@ class TestJobs(TestController):
job = Session.query(Job).get(jobId)
self._validateSubmitted(job)
def test_files_balanced(self):
"""
Checks the distribution of the file 'hashed ids' is reasonably uniformely distributed.
hashed_id is a legacy name, its purpose is balance the transfers between hosts
regardless of the number running in a giving moment
"""
self.setupGridsiteEnvironment()
self.pushDelegation()
files = []
for r in xrange(1000):
files.append({
'sources': ["root://source.es/file%d" % r],
'destinations': ["root://dest.ch/file%d" % r]
})
job = {'files': files}
answer = self.app.put(url = "/jobs",
params = json.dumps(job),
status = 200)
submitted = json.loads(answer.body)
hashed_ids = map(lambda f: f['hashed_id'], submitted['files'])
# Null hypothesis: the distribution of hashed_ids is uniform
histogram, min, binsize, outsiders = scipy.stats.histogram(hashed_ids, defaultlimits = (0, 2**16 - 1))
chisq, pvalue = scipy.stats.chisquare(histogram)
self.assertTrue(min >= 0)
self.assertEqual(outsiders, 0)
self.assertTrue(pvalue > 0.1)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment