Commit 479256fa authored by Alejandro Alvarez Ayllon's avatar Alejandro Alvarez Ayllon
Browse files

FTS-469: Send submit message

parent fcf01d2b
......@@ -45,10 +45,12 @@ BuildRequires: python-requests
BuildRequires: python-slimit
%endif
BuildRequires: pandoc
BuildRequires: python-dirq
Requires: gridsite%{?_isa} >= 1.7
Requires: httpd%{?_isa}
Requires: mod_wsgi
Requires: python-dirq
Requires: python-fts = %{version}-%{release}
Requires: python-paste-deploy
Requires: python-pylons
......
......@@ -8,9 +8,10 @@ DbConnectString=
Alias=Testing
Infosys=lcg-bdii.cern.ch:2170
AuthorizedVO=testers.eu-emi.eu;*
AuthorizedVO=dteam;*
CleanRecordsHost=false
MessagingDirectory=/tmp/rest-msgs
[roles]
Public=vo:transfer;all:datamanagement
......
......@@ -21,6 +21,8 @@ from requests.exceptions import HTTPError
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import noload
from fts3rest.lib.helpers.msgbus import submit_state_change
try:
import simplejson as json
except ImportError:
......@@ -582,6 +584,15 @@ class JobsController(BaseController):
Session.rollback()
raise
# Send messages
# Need to re-query so we get the file ids
job = Session.query(Job).get(populated.job_id)
for transfer in job.files:
try:
submit_state_change(job, transfer)
except Exception, e:
log.warning("Failed to write state message to disk: %s" % e.message)
if len(populated.files):
log.info("Job %s submitted with %d transfers" % (populated.job_id, len(populated.files)))
elif len(populated.datamanagement):
......
# Copyright notice:
# Copyright CERN, 2016.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pylons
import socket
import time
from dirq.QueueSimple import QueueSimple
try:
import simplejson as json
except:
import json
def submit_state_change(job, transfer):
"""
Writes a state change message to the dirq
"""
msg_dir = pylons.config.get('fts3.MessagingDirectory', '/var/lib/fts3')
msg_dir = os.path.join(msg_dir, 'status')
msg = dict(
endpnt=socket.getfqdn(),
user_dn=job['user_dn'],
src_url=transfer['source_surl'],
dst_url=transfer['dest_surl'],
vo_name=job['vo_name'],
source_se=transfer['source_se'],
dest_se=transfer['dest_se'],
job_id=job['job_id'],
file_id=transfer['file_id'],
job_state=job['job_state'],
file_state=transfer['file_state'],
retry_counter=0,
retry_max=0,
timestamp=time.time()*1000,
job_metadata=job['job_metadata'],
file_metadata=transfer['file_metadata'],
)
raw = "SS " + json.dumps(msg)
q = QueueSimple(path=msg_dir)
q.add(raw)
......@@ -26,6 +26,7 @@ setup-app`) and provides the base testing objects.
"""
import os
import pylons.test
import shutil
import time
import types
......@@ -34,7 +35,7 @@ from unittest import TestCase
from M2Crypto import ASN1, X509, RSA, EVP
from M2Crypto.ASN1 import UTC
from paste.script.appinstall import SetupCommand
from pylons import url
from pylons import url, config
from routes.util import URLGenerator
from webtest import TestApp, TestRequest
......@@ -265,6 +266,13 @@ class TestController(TestCase):
Session.query(ServerConfig).delete()
Session.commit()
# Delete messages
if 'fts3.MessagingDirectory' in config:
try:
shutil.rmtree(config['fts3.MessagingDirectory'])
except:
pass
# Handy asserts not available in the EPEL-6 version
def assertGreater(self, a, b):
if not a > b:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment