View Javadoc
1   package com.github.triceo.splitlog;
2   
3   import java.util.HashSet;
4   import java.util.Set;
5   import java.util.concurrent.CopyOnWriteArraySet;
6   import java.util.concurrent.atomic.AtomicBoolean;
7   
8   import org.apache.commons.collections4.BidiMap;
9   import org.apache.commons.collections4.bidimap.DualHashBidiMap;
10  import org.slf4j.Logger;
11  
12  import com.github.triceo.splitlog.api.Message;
13  import com.github.triceo.splitlog.api.MessageConsumer;
14  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
15  import com.github.triceo.splitlog.api.MessageListener;
16  import com.github.triceo.splitlog.api.MessageMeasure;
17  import com.github.triceo.splitlog.api.MessageMetric;
18  import com.github.triceo.splitlog.api.MessageProducer;
19  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
20  import com.github.triceo.splitlog.util.LogUtil;
21  import com.github.triceo.splitlog.util.LogUtil.Level;
22  
23  class ConsumerManager<P extends MessageProducer<P>> implements MessageProducer<P>, MessageConsumer<P>,
24  ConsumerRegistrar<P> {
25  
26      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(ConsumerManager.class);
27  
28      private final Set<MessageConsumer<P>> consumers = new CopyOnWriteArraySet<MessageConsumer<P>>();
29      private final AtomicBoolean isStopped = new AtomicBoolean(false);
30      private final BidiMap<String, DefaultMessageMetric<? extends Number, P>> metrics = new DualHashBidiMap<String, DefaultMessageMetric<? extends Number, P>>();
31  
32      private final P producer;
33  
34      public ConsumerManager(final P producer) {
35          this.producer = producer;
36      }
37  
38      @Override
39      public int countConsumers() {
40          return this.consumers.size();
41      }
42  
43      @Override
44      public synchronized int countMetrics() {
45          return this.metrics.size();
46      }
47  
48      @Override
49      public synchronized MessageMetric<? extends Number, P> getMetric(final String id) {
50          return this.metrics.get(id);
51      }
52  
53      @Override
54      public synchronized String getMetricId(final MessageMetric<? extends Number, P> measure) {
55          return this.metrics.getKey(measure);
56      }
57  
58      public P getProducer() {
59          return this.producer;
60      }
61  
62      @Override
63      public boolean isConsuming(final MessageConsumer<P> consumer) {
64          return this.consumers.contains(consumer);
65      }
66  
67      @Override
68      public synchronized boolean isMeasuring(final MessageMetric<? extends Number, P> metric) {
69          return this.metrics.containsValue(metric);
70      }
71  
72      @Override
73      public synchronized boolean isMeasuring(final String id) {
74          return this.metrics.containsKey(id);
75      }
76  
77      @Override
78      public boolean isStopped() {
79          return this.isStopped.get();
80      }
81  
82      @Override
83      public void messageReceived(final Message message, final MessageDeliveryStatus status, final P producer) {
84          if (this.isStopped()) {
85              throw new IllegalStateException("Consumer manager already stopped.");
86          }
87          LogUtil.newMessage(ConsumerManager.LOGGER, Level.INFO, "New message received:", message, status, producer, this);
88          for (final MessageConsumer<P> consumer : this.consumers) {
89              try {
90                  consumer.messageReceived(message, status, producer);
91              } catch (final Throwable t) {
92                  // calling user code; we need to be prepared for anything
93                  ConsumerManager.LOGGER.warn("Failed notifying {} of '{}' with status {}. Stack trace on DEBUG.",
94                          consumer, message, status, t.getMessage());
95                  ConsumerManager.LOGGER.debug("Failed notifying {} of '{}' with status {}.", consumer, message, status,
96                          t);
97              }
98          }
99      }
100 
101     @Override
102     public boolean registerConsumer(final MessageConsumer<P> consumer) {
103         if (this.isStopped()) {
104             throw new IllegalStateException("Consumer manager already stopped.");
105         } else if (this.consumers.add(consumer)) {
106             ConsumerManager.LOGGER.info("Registered consumer {} for {}.", consumer, this.producer);
107             return true;
108         } else {
109             return false;
110         }
111     }
112 
113     @Override
114     public MessageConsumer<P> startConsuming(final MessageListener<P> listener) {
115         if (listener instanceof MessageConsumer<?>) {
116             throw new IllegalArgumentException("Cannot consume consumers.");
117         }
118         final MessageConsumer<P> consumer = new DefaultMessageConsumer<P>(listener, this.producer);
119         if (this.registerConsumer(consumer)) {
120             ConsumerManager.LOGGER.info("Registered new consumer {} for {}.", consumer, listener);
121             return consumer;
122         }
123         /*
124          * we know that there is a consumer with the same properties; disregard
125          * the new and return the old instead.
126          */
127         for (final MessageConsumer<P> existing : this.consumers) {
128             if (existing.equals(consumer)) {
129                 ConsumerManager.LOGGER.info("Retrieve pre-existing consumer {} for {}.", consumer, listener);
130                 return existing;
131             }
132         }
133         throw new IllegalStateException("Unreachable code.");
134     }
135 
136     @Override
137     public synchronized <T extends Number> MessageMetric<T, P> startMeasuring(final MessageMeasure<T, P> measure,
138             final String id) {
139         if (this.isStopped()) {
140             throw new IllegalStateException("Measuring consumer manager already stopped.");
141         } else if (measure == null) {
142             throw new IllegalArgumentException("Measure may not be null.");
143         } else if (id == null) {
144             throw new IllegalArgumentException("ID may not be null.");
145         } else if (this.metrics.containsKey(id)) {
146             throw new IllegalArgumentException("Duplicate ID:" + id);
147         }
148         ConsumerManager.LOGGER.info("Starting measuring {} in {}.", id, this.getProducer());
149         final DefaultMessageMetric<T, P> metric = new DefaultMessageMetric<T, P>(this.getProducer(), measure);
150         this.metrics.put(id, metric);
151         this.registerConsumer(metric);
152         return metric;
153     }
154 
155     @Override
156     public boolean stop() {
157         if (!this.isStopped.compareAndSet(false, true)) {
158             return false;
159         }
160         ConsumerManager.LOGGER.info("Stopping consumer manager for {}.", this.producer);
161         for (final MessageConsumer<P> consumer : this.consumers) {
162             this.stopConsuming(consumer);
163         }
164         for (final String metricId : new HashSet<String>(this.metrics.keySet())) {
165             this.stopMeasuring(metricId);
166         }
167         ConsumerManager.LOGGER.info("Stopped metrics consumer manager for {}.", this.getProducer());
168         return true;
169     }
170 
171     @Override
172     public boolean stopConsuming(final MessageConsumer<P> consumer) {
173         if (this.consumers.remove(consumer)) {
174             ConsumerManager.LOGGER.info("Unregistered consumer {} for {}.", consumer, this.producer);
175             return true;
176         } else {
177             return false;
178         }
179 
180     }
181 
182     @Override
183     public synchronized boolean stopMeasuring(final MessageMetric<? extends Number, P> measure) {
184         if (!this.isMeasuring(measure)) {
185             return false;
186         }
187         return this.stopMeasuring(this.metrics.getKey(measure));
188     }
189 
190     @Override
191     public synchronized boolean stopMeasuring(final String id) {
192         if (!this.isMeasuring(id)) {
193             return false;
194         }
195         final DefaultMessageMetric<? extends Number, P> removed = this.metrics.remove(id);
196         this.stopConsuming(removed);
197         ConsumerManager.LOGGER.info("Stopped measuring {} in {}.", id, this.getProducer());
198         return true;
199     }
200 
201     @Override
202     public String toString() {
203         final StringBuilder builder = new StringBuilder();
204         builder.append("ConsumerManager [");
205         if (this.getProducer() != null) {
206             builder.append("getProducer()=").append(this.getProducer()).append(", ");
207         }
208         builder.append("isStopped()=").append(this.isStopped()).append("]");
209         return builder.toString();
210     }
211 
212 }