1 package com.github.triceo.splitlog;
2
3 import java.util.HashSet;
4 import java.util.Set;
5 import java.util.concurrent.CopyOnWriteArraySet;
6 import java.util.concurrent.atomic.AtomicBoolean;
7
8 import org.apache.commons.collections4.BidiMap;
9 import org.apache.commons.collections4.bidimap.DualHashBidiMap;
10 import org.slf4j.Logger;
11
12 import com.github.triceo.splitlog.api.Message;
13 import com.github.triceo.splitlog.api.MessageConsumer;
14 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
15 import com.github.triceo.splitlog.api.MessageListener;
16 import com.github.triceo.splitlog.api.MessageMeasure;
17 import com.github.triceo.splitlog.api.MessageMetric;
18 import com.github.triceo.splitlog.api.MessageProducer;
19 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
20 import com.github.triceo.splitlog.util.LogUtil;
21 import com.github.triceo.splitlog.util.LogUtil.Level;
22
23 class ConsumerManager<P extends MessageProducer<P>> implements MessageProducer<P>, MessageConsumer<P>,
24 ConsumerRegistrar<P> {
25
26 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(ConsumerManager.class);
27
28 private final Set<MessageConsumer<P>> consumers = new CopyOnWriteArraySet<MessageConsumer<P>>();
29 private final AtomicBoolean isStopped = new AtomicBoolean(false);
30 private final BidiMap<String, DefaultMessageMetric<? extends Number, P>> metrics = new DualHashBidiMap<String, DefaultMessageMetric<? extends Number, P>>();
31
32 private final P producer;
33
34 public ConsumerManager(final P producer) {
35 this.producer = producer;
36 }
37
38 @Override
39 public int countConsumers() {
40 return this.consumers.size();
41 }
42
43 @Override
44 public synchronized int countMetrics() {
45 return this.metrics.size();
46 }
47
48 @Override
49 public synchronized MessageMetric<? extends Number, P> getMetric(final String id) {
50 return this.metrics.get(id);
51 }
52
53 @Override
54 public synchronized String getMetricId(final MessageMetric<? extends Number, P> measure) {
55 return this.metrics.getKey(measure);
56 }
57
58 public P getProducer() {
59 return this.producer;
60 }
61
62 @Override
63 public boolean isConsuming(final MessageConsumer<P> consumer) {
64 return this.consumers.contains(consumer);
65 }
66
67 @Override
68 public synchronized boolean isMeasuring(final MessageMetric<? extends Number, P> metric) {
69 return this.metrics.containsValue(metric);
70 }
71
72 @Override
73 public synchronized boolean isMeasuring(final String id) {
74 return this.metrics.containsKey(id);
75 }
76
77 @Override
78 public boolean isStopped() {
79 return this.isStopped.get();
80 }
81
82 @Override
83 public void messageReceived(final Message message, final MessageDeliveryStatus status, final P producer) {
84 if (this.isStopped()) {
85 throw new IllegalStateException("Consumer manager already stopped.");
86 }
87 LogUtil.newMessage(ConsumerManager.LOGGER, Level.INFO, "New message received:", message, status, producer, this);
88 for (final MessageConsumer<P> consumer : this.consumers) {
89 try {
90 consumer.messageReceived(message, status, producer);
91 } catch (final Throwable t) {
92
93 ConsumerManager.LOGGER.warn("Failed notifying {} of '{}' with status {}. Stack trace on DEBUG.",
94 consumer, message, status, t.getMessage());
95 ConsumerManager.LOGGER.debug("Failed notifying {} of '{}' with status {}.", consumer, message, status,
96 t);
97 }
98 }
99 }
100
101 @Override
102 public boolean registerConsumer(final MessageConsumer<P> consumer) {
103 if (this.isStopped()) {
104 throw new IllegalStateException("Consumer manager already stopped.");
105 } else if (this.consumers.add(consumer)) {
106 ConsumerManager.LOGGER.info("Registered consumer {} for {}.", consumer, this.producer);
107 return true;
108 } else {
109 return false;
110 }
111 }
112
113 @Override
114 public MessageConsumer<P> startConsuming(final MessageListener<P> listener) {
115 if (listener instanceof MessageConsumer<?>) {
116 throw new IllegalArgumentException("Cannot consume consumers.");
117 }
118 final MessageConsumer<P> consumer = new DefaultMessageConsumer<P>(listener, this.producer);
119 if (this.registerConsumer(consumer)) {
120 ConsumerManager.LOGGER.info("Registered new consumer {} for {}.", consumer, listener);
121 return consumer;
122 }
123
124
125
126
127 for (final MessageConsumer<P> existing : this.consumers) {
128 if (existing.equals(consumer)) {
129 ConsumerManager.LOGGER.info("Retrieve pre-existing consumer {} for {}.", consumer, listener);
130 return existing;
131 }
132 }
133 throw new IllegalStateException("Unreachable code.");
134 }
135
136 @Override
137 public synchronized <T extends Number> MessageMetric<T, P> startMeasuring(final MessageMeasure<T, P> measure,
138 final String id) {
139 if (this.isStopped()) {
140 throw new IllegalStateException("Measuring consumer manager already stopped.");
141 } else if (measure == null) {
142 throw new IllegalArgumentException("Measure may not be null.");
143 } else if (id == null) {
144 throw new IllegalArgumentException("ID may not be null.");
145 } else if (this.metrics.containsKey(id)) {
146 throw new IllegalArgumentException("Duplicate ID:" + id);
147 }
148 ConsumerManager.LOGGER.info("Starting measuring {} in {}.", id, this.getProducer());
149 final DefaultMessageMetric<T, P> metric = new DefaultMessageMetric<T, P>(this.getProducer(), measure);
150 this.metrics.put(id, metric);
151 this.registerConsumer(metric);
152 return metric;
153 }
154
155 @Override
156 public boolean stop() {
157 if (!this.isStopped.compareAndSet(false, true)) {
158 return false;
159 }
160 ConsumerManager.LOGGER.info("Stopping consumer manager for {}.", this.producer);
161 for (final MessageConsumer<P> consumer : this.consumers) {
162 this.stopConsuming(consumer);
163 }
164 for (final String metricId : new HashSet<String>(this.metrics.keySet())) {
165 this.stopMeasuring(metricId);
166 }
167 ConsumerManager.LOGGER.info("Stopped metrics consumer manager for {}.", this.getProducer());
168 return true;
169 }
170
171 @Override
172 public boolean stopConsuming(final MessageConsumer<P> consumer) {
173 if (this.consumers.remove(consumer)) {
174 ConsumerManager.LOGGER.info("Unregistered consumer {} for {}.", consumer, this.producer);
175 return true;
176 } else {
177 return false;
178 }
179
180 }
181
182 @Override
183 public synchronized boolean stopMeasuring(final MessageMetric<? extends Number, P> measure) {
184 if (!this.isMeasuring(measure)) {
185 return false;
186 }
187 return this.stopMeasuring(this.metrics.getKey(measure));
188 }
189
190 @Override
191 public synchronized boolean stopMeasuring(final String id) {
192 if (!this.isMeasuring(id)) {
193 return false;
194 }
195 final DefaultMessageMetric<? extends Number, P> removed = this.metrics.remove(id);
196 this.stopConsuming(removed);
197 ConsumerManager.LOGGER.info("Stopped measuring {} in {}.", id, this.getProducer());
198 return true;
199 }
200
201 @Override
202 public String toString() {
203 final StringBuilder builder = new StringBuilder();
204 builder.append("ConsumerManager [");
205 if (this.getProducer() != null) {
206 builder.append("getProducer()=").append(this.getProducer()).append(", ");
207 }
208 builder.append("isStopped()=").append(this.isStopped()).append("]");
209 return builder.toString();
210 }
211
212 }