View Javadoc

1   package com.github.triceo.splitlog;
2   
3   import java.util.Collections;
4   import java.util.SortedSet;
5   import java.util.TreeSet;
6   
7   import org.slf4j.Logger;
8   import org.slf4j.LoggerFactory;
9   
10  import com.github.triceo.splitlog.api.Follower;
11  import com.github.triceo.splitlog.api.MergingFollower;
12  import com.github.triceo.splitlog.api.Message;
13  import com.github.triceo.splitlog.api.MessageComparator;
14  import com.github.triceo.splitlog.api.MessageConsumer;
15  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
16  import com.github.triceo.splitlog.api.MessageListener;
17  import com.github.triceo.splitlog.api.SimpleMessageCondition;
18  
19  final class NonStoringMergingFollower extends AbstractMergingFollower {
20  
21      private static final Logger LOGGER = LoggerFactory.getLogger(NonStoringFollower.class);
22  
23      private final ConsumerManager<MergingFollower> consumers = new ConsumerManager<MergingFollower>(this);
24  
25      public NonStoringMergingFollower(final Follower... followers) {
26          super(followers);
27      }
28  
29      @Override
30      public int countConsumers() {
31          return this.consumers.countConsumers();
32      }
33  
34      @Override
35      public SortedSet<Message> getMessages(final SimpleMessageCondition condition, final MessageComparator order) {
36          final SortedSet<Message> sorted = new TreeSet<Message>(order);
37          for (final Follower f : this.getMerged()) {
38              for (final Message m : f.getMessages()) {
39                  if (!condition.accept(m)) {
40                      continue;
41                  }
42                  sorted.add(m);
43              }
44          }
45          return Collections.unmodifiableSortedSet(sorted);
46      }
47  
48      @Override
49      public boolean isConsuming(final MessageConsumer<MergingFollower> consumer) {
50          return this.consumers.isConsuming(consumer);
51      }
52  
53      @Override
54      public void messageReceived(final Message msg, final MessageDeliveryStatus status, final Follower source) {
55          if (this.isStopped()) {
56              throw new IllegalStateException("Follower already stopped.");
57          } else if (!this.getMerged().contains(source)) {
58              throw new IllegalArgumentException("Forbidden notification source: " + source);
59          }
60          NonStoringMergingFollower.LOGGER.info("{} notified of '{}' with status {} by {}.", this, msg, status, source);
61          this.getExchange().messageReceived(msg, status, source);
62          this.consumers.messageReceived(msg, status, this);
63      }
64  
65      @Override
66      public MessageConsumer<MergingFollower> startConsuming(final MessageListener<MergingFollower> consumer) {
67          return this.consumers.startConsuming(consumer);
68      }
69  
70      @Override
71      public boolean stopConsuming(final MessageConsumer<MergingFollower> consumer) {
72          return this.consumers.stopConsuming(consumer);
73      }
74  
75      @Override
76      public String toString() {
77          final StringBuilder builder = new StringBuilder();
78          builder.append("NonStoringMergingFollower [");
79          if (this.getMerged() != null) {
80              builder.append("getMerged()=").append(this.getMerged()).append(", ");
81          }
82          builder.append("isStopped()=").append(this.isStopped()).append("]");
83          return builder.toString();
84      }
85  
86  }