diff --git a/pom.xml b/pom.xml index e455e6150f4eb88c93288494fd7e1186d1a52a2c..4da876fa05004b72e53660edba8b6d2877ba3840 100644 --- a/pom.xml +++ b/pom.xml @@ -1,96 +1,113 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> - <artifactId>c2mon-web-atmosphere</artifactId> - <version>0.3.1-SNAPSHOT</version> - <packaging>jar</packaging> + <artifactId>c2mon-web-atmosphere</artifactId> + <version>0.3.1-SNAPSHOT</version> + <packaging>jar</packaging> - <name>Atmosphere publisher for C2MON</name> - <description>Atmosphere publisher for C2MON</description> + <name>Atmosphere publisher for C2MON</name> + <description>Atmosphere publisher for C2MON</description> - <parent> - <groupId>cern.c2mon.client</groupId> - <artifactId>c2mon-client</artifactId> - <version>1.9.7-SNAPSHOT</version> - </parent> + <parent> + <groupId>cern.c2mon.client</groupId> + <artifactId>c2mon-client</artifactId> + <version>1.9.7-SNAPSHOT</version> + </parent> - <scm> - <url>https://gitlab.cern.ch/c2mon/c2mon-web-atmosphere</url> - <connection>scm:git:ssh://git@gitlab.cern.ch/c2mon/c2mon-web-atmosphere.git</connection> - <developerConnection>scm:git:ssh://git@gitlab.cern.ch:7999/c2mon/c2mon-web-atmosphere.git</developerConnection> - <tag>HEAD</tag> - </scm> + <scm> + <url>https://gitlab.cern.ch/c2mon/c2mon-web-atmosphere</url> + <connection>scm:git:ssh://git@gitlab.cern.ch/c2mon/c2mon-web-atmosphere.git</connection> + <developerConnection>scm:git:ssh://git@gitlab.cern.ch:7999/c2mon/c2mon-web-atmosphere.git</developerConnection> + <tag>HEAD</tag> + </scm> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <java.version>8</java.version> - <surefire.c2mon.client.jms.url>http://dipcm.web.cern.ch</surefire.c2mon.client.jms.url> - <surefire.c2mon.atmo.test.timeout>30</surefire.c2mon.atmo.test.timeout> - </properties> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <java.version>8</java.version> + <surefire.c2mon.client.jms.url>http://dipcm.web.cern.ch</surefire.c2mon.client.jms.url> + <surefire.c2mon.atmo.test.timeout>30</surefire.c2mon.atmo.test.timeout> + </properties> - <dependencies> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-web</artifactId> - <version>${org.springframework.boot.version}</version> - </dependency> + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + <version>${org.springframework.boot.version}</version> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-websocket</artifactId> + <version>${org.springframework.boot.version}</version> + </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-websocket</artifactId> - <version>${org.springframework.boot.version}</version> - </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.atmosphere</groupId> + <artifactId>atmosphere-spring</artifactId> + <version>2.5.2</version> + </dependency> + + <dependency> + <!-- C2MON dependencies --> + <groupId>cern.c2mon.client</groupId> + <artifactId>c2mon-client-core</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-test</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.atmosphere</groupId> - <artifactId>atmosphere-spring</artifactId> - <version>2.5.2</version> - </dependency> - - <dependency> - <groupId>javax.inject</groupId> - <artifactId>javax.inject</artifactId> - <version>1</version> - </dependency> - <dependency> - <!-- C2MON dependencies --> - <groupId>cern.c2mon.client</groupId> - <artifactId>c2mon-client-core</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-websocket</artifactId> - <version>${org.springframework.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - </dependency> - - </dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.webjars</groupId> + <artifactId>webjars-locator</artifactId> + <version>[0.30,)</version> + </dependency> + <dependency> + <groupId>org.webjars</groupId> + <artifactId>atmosphere-javascript</artifactId> + <version>3.0.4</version> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-websocket</artifactId> + <version>${org.springframework.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <release>${java.version}</release> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/src/main/java/cern/c2mon/client/atmosphere/C2MonBroadcastService.java b/src/main/java/cern/c2mon/client/atmosphere/C2MonBroadcastService.java index 74d521aef16e50a5f0d55c3e1b48db04c34788ea..da6b013e9a82fab6cb677f62e3285f846d2c2a94 100644 --- a/src/main/java/cern/c2mon/client/atmosphere/C2MonBroadcastService.java +++ b/src/main/java/cern/c2mon/client/atmosphere/C2MonBroadcastService.java @@ -1,16 +1,17 @@ package cern.c2mon.client.atmosphere; import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import javax.inject.Inject; - import org.atmosphere.config.managed.Decoder; import org.atmosphere.config.managed.Encoder; import org.atmosphere.config.service.DeliverTo; @@ -24,6 +25,7 @@ import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResourceEvent; import org.atmosphere.cpr.Broadcaster; import org.atmosphere.cpr.BroadcasterFactory; +import org.atmosphere.cpr.BroadcasterListenerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -35,216 +37,325 @@ import cern.c2mon.client.common.tag.Tag; import cern.c2mon.client.core.service.TagService; /** - * This Atmosphere service accepts subscription requests via GET, POST or WebSocket messages. - * It connects C2MON tags to Atmosphere broadcasters to allow the forwarding of data. + * This Atmosphere service accepts subscription requests via GET, POST or + * WebSocket messages. It connects C2MON tags to Atmosphere broadcasters to + * allow the forwarding of data. * * @author bcopy */ -@ManagedService(path = "/broadcast") -public class C2MonBroadcastService { - private final Logger logger = LoggerFactory.getLogger(C2MonBroadcastService.class); - - private static TagUpdateCodec tagUpdatesCodec = new TagUpdateCodec(); - - @Inject - TagService c2monTagService; - - private AtomicBoolean initialized = new AtomicBoolean(); - - private BroadcasterFactory m_broadcasterFactory; - - @Ready - public void onReady(final AtmosphereResource resource) { - this.logger.info("Connected", resource.uuid()); - obtainBroadcasterFactory(resource); - } - - private void obtainBroadcasterFactory(final AtmosphereResource resource) { - if (!initialized.getAndSet(true)) { - m_broadcasterFactory = resource.getAtmosphereConfig().getBroadcasterFactory(); - } - } - - @Disconnect - public void onDisconnect(AtmosphereResourceEvent event) { - this.logger.info("Client {} disconnected [{}]", event.getResource().uuid(), - (event.isCancelled() ? "cancelled" : "closed")); - } - - @Resume - public void onTimeout(AtmosphereResourceEvent e) { - if (e.isResumedOnTimeout()) { - try { - e.getResource().close(); - } catch (IOException e1) { - this.logger.warn("Exception while closing resource : ", e1); - } - } - - // TODO : Add C2MON tag listener cleanup - } - - @Get - public void onGetSubscribe(AtmosphereResource resource) { - obtainBroadcasterFactory(resource); -// String[] subscriptionsIdToAdd = resource.getRequest().getParameterValues("id"); -// -// List<SubscriptionUpdateTagIdentifier> tagIdentifiersById = Arrays.asList(subscriptionsIdToAdd).stream() -// .map(id -> SubscriptionUpdateTagIdentifier.builder().id(Long.valueOf(id)).build()) -// .collect(Collectors.toList()); -// -// String[] subscriptionsNamesToAdd = resource.getRequest().getParameterValues("name"); -// -// List<SubscriptionUpdateTagIdentifier> tagIdentifiersByName = Arrays.asList(subscriptionsNamesToAdd).stream() -// .map(name -> SubscriptionUpdateTagIdentifier.builder().name(name).build()) -// .collect(Collectors.toList()); -// -// -// tagIdentifiersById.addAll(tagIdentifiersByName); +@ManagedService(path = "/broadcast/{view: [a-zA-Z][a-zA-Z_0-9]*}") +public class C2MonBroadcastService extends BroadcasterListenerAdapter { + private final Logger logger = LoggerFactory.getLogger(C2MonBroadcastService.class); + + private static TagUpdateCodec tagUpdatesCodec = new TagUpdateCodec(); + + @Autowired + TagService c2monTagService; + + private AtomicBoolean initialized = new AtomicBoolean(); + + private BroadcasterFactory broadcasterFactory; + + private Broadcaster broadcaster; + + private final ObjectMapper mapper = new ObjectMapper(); + + public C2MonBroadcastService() { + } + + @Ready + public void onReady(final AtmosphereResource resource, Broadcaster bcaster) { + if (this.logger.isInfoEnabled()) { + this.logger.info("Connected {}", resource.uuid()); + } + obtainBroadcasterFactory(resource); + + } + + private void obtainBroadcasterFactory(final AtmosphereResource resource) { + if (!initialized.getAndSet(true)) { + broadcasterFactory = resource.getAtmosphereConfig().getBroadcasterFactory(); + broadcasterFactory.addBroadcasterListener(this); + broadcaster = resource.getBroadcaster(); + } + } + + @Disconnect + public void onDisconnect(AtmosphereResourceEvent event) { + if (logger.isInfoEnabled()) { + this.logger.info("Client {} disconnected [{}]", event.getResource().uuid(), + (event.isCancelled() ? "cancelled" : "closed")); + } + } + + @Resume + public void onTimeout(AtmosphereResourceEvent e) { + if (e.isResumedOnTimeout()) { + try { + e.getResource().close(); + } catch (IOException e1) { + this.logger.warn("Exception while closing resource : ", e1); + } + } + } + + @Get + public void onGetSubscribe(AtmosphereResource resource) { + obtainBroadcasterFactory(resource); + String[] subscriptionsIdToAdd = resource.getRequest().getParameterValues("id"); + List<SubscriptionUpdateTagIdentifier> tagIdentifiersById = new ArrayList<SubscriptionUpdateTagIdentifier>(); + if (subscriptionsIdToAdd.length > 0) { + tagIdentifiersById = Arrays.asList(subscriptionsIdToAdd).stream() + .map(id -> SubscriptionUpdateTagIdentifier.builder().id(Long.valueOf(id)).build()) + .collect(Collectors.toList()); + + String[] subscriptionsNamesToAdd = resource.getRequest().getParameterValues("name"); + + List<SubscriptionUpdateTagIdentifier> tagIdentifiersByName = Arrays.asList(subscriptionsNamesToAdd).stream() + .map(name -> SubscriptionUpdateTagIdentifier.builder().name(name).build()) + .collect(Collectors.toList()); + + tagIdentifiersById.addAll(tagIdentifiersByName); + } resource.suspend(); -// subscribeTagsByTagIdentifier(resource, tagIdentifiersById); - } - public Set<Long> subscribeTagsByTagIdentifier(AtmosphereResource resource, List<SubscriptionUpdateTagIdentifier> subscriptionsToAdd) { - if(!resource.isSuspended()) { - resource.suspend(); + if (!tagIdentifiersById.isEmpty()) { + subscribeTagsByTagIdentifier(resource, tagIdentifiersById); } - - Set<Long> tagIdsList = subscriptionsToAdd.stream().map(SubscriptionUpdateTagIdentifier::getId).collect(Collectors.toSet()); - Set<String> tagNamesList = subscriptionsToAdd.stream().map(SubscriptionUpdateTagIdentifier::getName).collect(Collectors.toSet()); + } + + public Set<SimpleTag> queryTagInformation(AtmosphereResource resource, + List<SubscriptionUpdateTagIdentifier> tagIdentifiersList) { + Set<Long> tagsIdsToQuery = convertTagIdentifiersToTagIDs( + tagIdentifiersList.stream().filter(ident -> ident != null && ident.id != null) + .map(SubscriptionUpdateTagIdentifier::getId).collect(Collectors.toSet()) + ,tagIdentifiersList.stream().filter(ident -> ident != null && ident.name != null) + .map(SubscriptionUpdateTagIdentifier::getName).collect(Collectors.toSet()) + ,tagIdentifiersList.stream().filter(ident -> ident != null && ident.metakey != null && ident.metavalue != null) + .map(t -> { return new AbstractMap.SimpleEntry<String,String>(t.getMetakey(), t.getMetavalue()); }).collect(Collectors.toList())); + Set<SimpleTag> queryResult = new HashSet<>(); + try { + + for (Tag tagToAdd : c2monTagService.get(tagsIdsToQuery)) { + queryResult.add(new SimpleTagImpl(tagToAdd)); + } + } catch (Exception e) { + this.logger.warn("Failed tag query by ID for resource {} : {}", resource, e); + } + return queryResult; + } + + public Collection<SubscriptionUpdateTagIdentifier> subscribeTagsByTagIdentifier(AtmosphereResource resource, + List<SubscriptionUpdateTagIdentifier> tagIdentifiersList) { + Set<Long> tagsIdsToSubscribe = convertTagIdentifiersToTagIDs( + tagIdentifiersList.stream().filter(ident -> ident != null && ident.id != null) + .map(SubscriptionUpdateTagIdentifier::getId).collect(Collectors.toSet()), + tagIdentifiersList.stream().filter(ident -> ident != null && ident.name != null) + .map(SubscriptionUpdateTagIdentifier::getName).collect(Collectors.toSet()) + ,null); Set<Long> validTagIds = new HashSet<>(); - if(!tagIdsList.isEmpty() && tagIdsList.iterator().next() != null) { - this.logger.debug("Subscribing to tags IDs {}", tagIdsList.stream().map( Object::toString ).collect(Collectors.joining(","))); - } - if(!tagNamesList.isEmpty() && tagNamesList.iterator().next() != null) { - this.logger.debug("Subscribing to tags Names {}", tagNamesList.stream().collect(Collectors.joining(","))); - } - try { - // TODO : We should validate the tags before subscribing - for (Long tagToAdd : tagIdsList) { - if (tagToAdd != null) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Associating res {} with tag id {} ", resource.uuid(), tagToAdd); - } - - m_broadcasterFactory.lookup(tagToAdd, true).addAtmosphereResource(resource); - validTagIds.add(tagToAdd); +// for (Long tagToAdd : tagsIdsToSubscribe) { +// if (this.logger.isDebugEnabled()) { +// this.logger.debug("Associating res {} with tag id {} ", resource.uuid(), tagToAdd); +// } +// broadcasterFactory.lookup(tagToAdd, true).addAtmosphereResource(resource); +// validTagIds.add(tagToAdd); +// } + broadcaster.addAtmosphereResource(resource); + if (!tagsIdsToSubscribe.isEmpty()) { + c2monTagService.subscribe(tagsIdsToSubscribe, broadcastingTagListener); + } else { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Subscription request with an empty list !"); } - } - // TODO : Only subscribe to valid tags and return only those in the return result. - c2monTagService.subscribe(validTagIds, broadcastingTagListener); - } catch (Exception e) { - this.logger.warn("Failed tag subscription by ID : ", e); - } + } + } catch (Exception e) { + this.logger.warn("Failed tag subscription by ID : ", e); + } + + return validTagIds.stream().map(tagId -> SubscriptionUpdateTagIdentifier.builder().id(tagId).build()) + .collect(Collectors.toSet()); + } + + public Set<Long> unsubscribeTagsByTagIdentifier(AtmosphereResource resource, + List<SubscriptionUpdateTagIdentifier> tagIdentifiersList) { + Set<Long> tagsIdsToUnsubscribe = convertTagIdentifiersToTagIDs( + tagIdentifiersList.stream().filter(ident -> ident != null && ident.id != null) + .map(SubscriptionUpdateTagIdentifier::getId).collect(Collectors.toSet()) + ,tagIdentifiersList.stream().filter(ident -> ident != null && ident.name != null) + .map(SubscriptionUpdateTagIdentifier::getName).collect(Collectors.toSet()) + ,null); + + Set<Long> validTagIds = new HashSet<>(); try { - Set<Long> newTagNameIds = new HashSet<>(); - // TODO : We should validate the tags before subscribing - for (String tagNameToAdd : tagNamesList) { - if(tagNameToAdd!= null){ - Collection<Tag> tags = c2monTagService.findByName(tagNameToAdd); - for (Tag tag: tags) { - if(this.logger.isDebugEnabled()) { - this.logger.debug("Associating res {} with tag id {} (tag name {}) ", resource.uuid(), tag.getId(), tag.getName()); - } - m_broadcasterFactory.lookup(tag.getId(), true).addAtmosphereResource(resource); - newTagNameIds.add(tag.getId()); - } + for (Long tagID : tagsIdsToUnsubscribe) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Associating res {} with tag id {} ", resource.uuid(), tagID); } +// Broadcaster bcaster = broadcasterFactory.lookup(tagID, false); +// if (bcaster != null) { +// bcaster.removeAtmosphereResource(resource); +// } + validTagIds.add(tagID); } - // TODO : Only subscribe to valid tags and return only those in the return result. - c2monTagService.subscribe(newTagNameIds, broadcastingTagListener); + c2monTagService.unsubscribe(tagsIdsToUnsubscribe, broadcastingTagListener); } catch (Exception e) { - this.logger.warn("Failed tag subscription: ", e); + this.logger.warn("Failed tag unsubscription by ID : ", e); + } + + return validTagIds; + } + + private final Set<Long> convertTagIdentifiersToTagIDs(final Set<Long> tagIdsList + , final Set<String> tagNamesList + , final List<Map.Entry<String,String>> metadataPairs) { + Set<Long> result = new HashSet<>(); + + if ((tagIdsList != null) && !tagIdsList.isEmpty()) { + if (logger.isDebugEnabled()) { + this.logger.debug("Looking up tags IDs {}", + tagIdsList.stream().map(Object::toString).collect(Collectors.joining(","))); + } + result.addAll(tagIdsList); + } + + { + if ((tagNamesList != null) && !tagNamesList.isEmpty() && logger.isDebugEnabled()) { + this.logger.debug("Looking up tags Names {}", tagNamesList.stream().collect(Collectors.joining(","))); + } + // Convert all tag names to corresponding tag IDs + result.addAll(convertTagNameToIds(tagNamesList)); + } + + if(metadataPairs!=null) + { + for(Map.Entry<String,String> pair : metadataPairs) { + if(pair != null && pair.getKey() != null) { + try { + result.addAll(convertMetadataToTagIds(pair.getKey(), pair.getValue())); + }catch(Exception e) { + this.logger.error("Invalid metadata query {}, exception : {}", pair.toString(), e ); + } + } + } } - return tagIdsList; + return result; } - @Message(encoders = JacksonEncoderDecoder.class, decoders = JacksonEncoderDecoder.class) - @DeliverTo(DeliverTo.DELIVER_TO.RESOURCE) - public SubscriptionUpdateMessage onMessage(AtmosphereResource resource, SubscriptionUpdateMessage message) { - if(message.add.size() > 0){ - // TODO : Validate incoming URIs -// addSubscriptionURIsToResource(resource, message.add); - subscribeTagsByTagIdentifier(resource, message.add); - } - // TODO: Support message.remove - - // TODO : Return a message with all the subscriptions that were added, all that were removed - return message; - } - - public static class JacksonEncoderDecoder - implements Encoder<SubscriptionUpdateMessage, String>, Decoder<String, SubscriptionUpdateMessage> { - - private final ObjectMapper mapper = new ObjectMapper(); - - @Override - public String encode(SubscriptionUpdateMessage m) { - try { - return this.mapper.writeValueAsString(m); - } catch (IOException ex) { - throw new IllegalStateException(ex); - } - } - - @Override - public SubscriptionUpdateMessage decode(String s) { - try { - return this.mapper.readValue(s, SubscriptionUpdateMessage.class); - } catch (IOException ex) { - throw new IllegalStateException(ex); - } - } - - } - -// @Post -// public void subscribeByURI(AtmosphereResource resource) { -// obtainBroadcasterFactory(resource); -// String[] subscriptionsToAdd = resource.getRequest().getParameterValues("uri"); -// addSubscriptionURIsToResource(resource, subscriptionsToAdd); -// -// } - -// private void addSubscriptionURIsToResource(AtmosphereResource resource, String[] subscriptionsToAdd) { -// Set<String> uris = Arrays.stream(subscriptionsToAdd).collect(Collectors.toSet()); -// -// for (String uri : uris) { -// try { -// Tag tag = c2monDynConfigService.getTagForURI(new URI(uri)); -// c2monTagService.subscribe(tag.getId(), broadcastingTagListener); -// -// m_broadcasterFactory.lookup(tag.getId(), true).addAtmosphereResource(resource.suspend()); -// } catch (URISyntaxException ex) { -// this.logger.error("Could not subscribe to {} : syntax exception", uri.toString(), ex); -// } -// } -// -// c2monTagService.refresh(); -// } - - TagListener broadcastingTagListener = new TagListener() { - - @Override - public void onUpdate(Tag tagUpdate) { - onInitialUpdate(Arrays.asList(new Tag[] { tagUpdate })); - } - - @Override - public void onInitialUpdate(Collection<Tag> initialValues) { - for (Tag tagUpdate : initialValues) { - // Lookup a broadcaster for each tag, and broadcast the update - // there - Broadcaster b = m_broadcasterFactory.lookup(tagUpdate.getId(), true); - if (b != null) { - b.broadcast(tagUpdatesCodec.encode(tagUpdate)); - } - } - - } - }; + private final Collection<Long> convertTagNameToIds(final Set<String> tagNamesList) { + Collection<Tag> tags = c2monTagService.findByName(tagNamesList); + return tags.parallelStream().map(tag -> tag.getId()).collect(Collectors.toList()); + } + + private final Collection<Long> convertMetadataToTagIds(final String key, final String value) { + Collection<Tag> tags = c2monTagService.findByMetadata(key, value); + return tags.parallelStream().map(tag -> tag.getId()).collect(Collectors.toList()); + } + + + @Message(encoders = SubscriptionUpdateMessageEncDec.class, decoders = SubscriptionUpdateMessageEncDec.class) + @DeliverTo(DeliverTo.DELIVER_TO.RESOURCE) + public SubscriptionUpdateMessage onMessage(AtmosphereResource resource, SubscriptionUpdateMessage message) { + resource.suspend(); + SubscriptionUpdateMessage reply = new SubscriptionUpdateMessage(); + if (!message.query.isEmpty()) { + reply.queryResult + .addAll(queryTagInformation(resource, message.query).stream().collect(Collectors.toList())); + } else if (!message.add.isEmpty()) { + subscribeTagsByTagIdentifier(resource, message.add); + } + else if (!message.remove.isEmpty()) { + unsubscribeTagsByTagIdentifier(resource, message.add); + } + + return reply; + } + + @Message(encoders = TagUpdateListCodec.class, decoders = TagUpdateListCodec.class) + public Collection<Tag> onMessage(AtmosphereResource resource, Collection<Tag> tagUpdate) { + return tagUpdate; + } + + public final class SubscriptionUpdateMessageEncDec + implements Encoder<SubscriptionUpdateMessage, String>, Decoder<String, SubscriptionUpdateMessage> { + + @Override + public String encode(SubscriptionUpdateMessage m) { + try { + return mapper.writeValueAsString(m); + } catch (IOException ex) { + logger.warn("Unable to encode message : ", ex); + throw new IllegalStateException(ex); + } + } + + @Override + public SubscriptionUpdateMessage decode(String s) { + try { + return mapper.readValue(s, SubscriptionUpdateMessage.class); + } catch (IOException ex) { + logger.warn("Unable to decode message : ", ex); + throw new IllegalStateException(ex); + } + } + + } + + /* + * // @Post // public void subscribeByURI(AtmosphereResource resource) { // + * obtainBroadcasterFactory(resource); // String[] subscriptionsToAdd = + * resource.getRequest().getParameterValues("uri"); // + * addSubscriptionURIsToResource(resource, subscriptionsToAdd); // // } + * + * // private void addSubscriptionURIsToResource(AtmosphereResource + * resource, String[] subscriptionsToAdd) { // Set<String> uris = + * Arrays.stream(subscriptionsToAdd).collect(Collectors.toSet()); // // for + * (String uri : uris) { // try { // Tag tag = + * c2monDynConfigService.getTagForURI(new URI(uri)); // + * c2monTagService.subscribe(tag.getId(), broadcastingTagListener); // // + * m_broadcasterFactory.lookup(tag.getId(), + * true).addAtmosphereResource(resource.suspend()); // } catch + * (URISyntaxException ex) { // + * this.logger.error("Could not subscribe to {} : syntax exception", + * uri.toString(), ex); // } // } // // c2monTagService.refresh(); // } + * + */ + + TagListener broadcastingTagListener = new TagListener() { + @Override + public void onUpdate(Tag tagUpdate) { + onInitialUpdate(Arrays.asList(tagUpdate)); + } + + @Override + public void onInitialUpdate(Collection<Tag> initialValues) { +// initialValues.parallelStream().forEach(tag -> { +// Broadcaster b = broadcasterFactory.lookup(tag.getId(), false); +// if (b != null) { +// b.broadcast(tag); +// } +// }); + broadcaster.broadcast(initialValues); + } + }; + +// /** +// * BroadcasterListener implementation that cleans up any tag subscriptions +// * when the broadcaster is removed. +// * +// * @param broadcaster +// * The broadcaster about to be destroyed. +// */ +// @Override +// public void onPreDestroy(Broadcaster b) { +// try { +// c2monTagService.unsubscribe(Long.valueOf(b.getID()), broadcastingTagListener); +// } catch (Exception e) { +// this.logger.warn("Could not unsubscribe from tag {} upon broadcaster destroy, cause : {} ", b.getID(), e); +// } +// } } diff --git a/src/main/java/cern/c2mon/client/atmosphere/SimpleTag.java b/src/main/java/cern/c2mon/client/atmosphere/SimpleTag.java index f47f56c602cc065d4bf2cc78dbc0bbae6cb14084..3e818b9a93f54a587fdef14ae6a2171d29cdbb10 100644 --- a/src/main/java/cern/c2mon/client/atmosphere/SimpleTag.java +++ b/src/main/java/cern/c2mon/client/atmosphere/SimpleTag.java @@ -1,12 +1,13 @@ package cern.c2mon.client.atmosphere; -import java.io.Serializable; - import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; import cern.c2mon.client.common.tag.Tag; -@JsonIgnoreProperties({"aliveTag", "aliveTagFlag", "controlTag", "controlTagFlag", "updateTagLock"}) +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +//value={"aliveTag", "aliveTagFlag", "controlTag", "controlTagFlag", "updateTagLock"}) public interface SimpleTag extends Tag { } diff --git a/src/main/java/cern/c2mon/client/atmosphere/SimpleTagImpl.java b/src/main/java/cern/c2mon/client/atmosphere/SimpleTagImpl.java index 8fdda53b3d137770f7508aa87ca2165018cc834f..d61db3fb423da800bd17cf5a7bed5af359a11f07 100644 --- a/src/main/java/cern/c2mon/client/atmosphere/SimpleTagImpl.java +++ b/src/main/java/cern/c2mon/client/atmosphere/SimpleTagImpl.java @@ -5,23 +5,27 @@ import java.io.Serializable; import com.fasterxml.jackson.annotation.JsonIgnore; import cern.c2mon.client.common.tag.Tag; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.experimental.Delegate; @Data +@Builder +@NoArgsConstructor public class SimpleTagImpl implements SimpleTag, Serializable { /** * Generated serialization UID */ private static final long serialVersionUID = -476267784368913029L; - @Delegate(types=SimpleTag.class) @JsonIgnore - Tag delegate; + @Delegate(types=SimpleTag.class) + Tag tag; public SimpleTagImpl(Tag tag){ - delegate = tag; + this.tag = tag; } - + } diff --git a/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateMessage.java b/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateMessage.java index 34ecb96b623a8a0f2a31613e6ce6edc4e9946836..a46f8197389e5e14e7f940b406b78132d77ac6c0 100644 --- a/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateMessage.java +++ b/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateMessage.java @@ -3,12 +3,21 @@ package cern.c2mon.client.atmosphere; import java.util.ArrayList; import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class SubscriptionUpdateMessage { List<SubscriptionUpdateTagIdentifier> add = new ArrayList<>(); List<SubscriptionUpdateTagIdentifier> remove = new ArrayList<>(); + List<SubscriptionUpdateTagIdentifier> query = new ArrayList<>(); + List<SimpleTag> queryResult = new ArrayList<>(); + } diff --git a/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateTagIdentifier.java b/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateTagIdentifier.java index f5523d4cb47a03a45b756e112be5839a282d1e4e..22faa0d1a87a709f6398ad32f6b35183f6859ce2 100644 --- a/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateTagIdentifier.java +++ b/src/main/java/cern/c2mon/client/atmosphere/SubscriptionUpdateTagIdentifier.java @@ -1,5 +1,7 @@ package cern.c2mon.client.atmosphere; +import java.util.HashMap; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -12,4 +14,6 @@ import lombok.NoArgsConstructor; public class SubscriptionUpdateTagIdentifier { Long id; String name; + String metakey; + String metavalue; } diff --git a/src/main/java/cern/c2mon/client/atmosphere/TagUpdateListCodec.java b/src/main/java/cern/c2mon/client/atmosphere/TagUpdateListCodec.java new file mode 100644 index 0000000000000000000000000000000000000000..937b7d1ba39c8fa6bba571a83931a38f35c3020a --- /dev/null +++ b/src/main/java/cern/c2mon/client/atmosphere/TagUpdateListCodec.java @@ -0,0 +1,36 @@ +package cern.c2mon.client.atmosphere; + +import java.io.IOException; +import java.util.List; + +import org.atmosphere.config.managed.Decoder; +import org.atmosphere.config.managed.Encoder; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import cern.c2mon.client.common.tag.Tag; + +public class TagUpdateListCodec implements + Encoder<List<Tag>, String>, Decoder<String, List<Tag>> { + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Override + public String encode(List<Tag> list) { + try { + return mapper.writeValueAsString(list); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public List<Tag> decode(String m) { + try { + return mapper.readValue(m, new TypeReference<List<Tag>>() {}); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}