1 package com.github.triceo.splitlog; 2 3 import java.util.Collections; 4 import java.util.SortedSet; 5 import java.util.TreeSet; 6 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 10 import com.github.triceo.splitlog.api.Follower; 11 import com.github.triceo.splitlog.api.MergingFollower; 12 import com.github.triceo.splitlog.api.Message; 13 import com.github.triceo.splitlog.api.MessageComparator; 14 import com.github.triceo.splitlog.api.MessageConsumer; 15 import com.github.triceo.splitlog.api.MessageDeliveryStatus; 16 import com.github.triceo.splitlog.api.MessageListener; 17 import com.github.triceo.splitlog.api.SimpleMessageCondition; 18 19 final class NonStoringMergingFollower extends AbstractMergingFollower { 20 21 private static final Logger LOGGER = LoggerFactory.getLogger(NonStoringFollower.class); 22 23 private final ConsumerManager<MergingFollower> consumers = new ConsumerManager<MergingFollower>(this); 24 25 public NonStoringMergingFollower(final Follower... followers) { 26 super(followers); 27 } 28 29 @Override 30 public int countConsumers() { 31 return this.consumers.countConsumers(); 32 } 33 34 @Override 35 public SortedSet<Message> getMessages(final SimpleMessageCondition condition, final MessageComparator order) { 36 final SortedSet<Message> sorted = new TreeSet<Message>(order); 37 for (final Follower f : this.getMerged()) { 38 for (final Message m : f.getMessages()) { 39 if (!condition.accept(m)) { 40 continue; 41 } 42 sorted.add(m); 43 } 44 } 45 return Collections.unmodifiableSortedSet(sorted); 46 } 47 48 @Override 49 public boolean isConsuming(final MessageConsumer<MergingFollower> consumer) { 50 return this.consumers.isConsuming(consumer); 51 } 52 53 @Override 54 public void messageReceived(final Message msg, final MessageDeliveryStatus status, final Follower source) { 55 if (this.isStopped()) { 56 throw new IllegalStateException("Follower already stopped."); 57 } else if (!this.getMerged().contains(source)) { 58 throw new IllegalArgumentException("Forbidden notification source: " + source); 59 } 60 NonStoringMergingFollower.LOGGER.info("{} notified of '{}' with status {} by {}.", this, msg, status, source); 61 this.getExchange().messageReceived(msg, status, source); 62 this.consumers.messageReceived(msg, status, this); 63 } 64 65 @Override 66 public MessageConsumer<MergingFollower> startConsuming(final MessageListener<MergingFollower> consumer) { 67 return this.consumers.startConsuming(consumer); 68 } 69 70 @Override 71 public boolean stopConsuming(final MessageConsumer<MergingFollower> consumer) { 72 return this.consumers.stopConsuming(consumer); 73 } 74 75 @Override 76 public String toString() { 77 final StringBuilder builder = new StringBuilder(); 78 builder.append("NonStoringMergingFollower ["); 79 if (this.getMerged() != null) { 80 builder.append("getMerged()=").append(this.getMerged()).append(", "); 81 } 82 builder.append("isStopped()=").append(this.isStopped()).append("]"); 83 return builder.toString(); 84 } 85 86 }