1 package com.github.triceo.splitlog;
2
3 import java.io.OutputStream;
4 import java.util.SortedSet;
5 import java.util.concurrent.Future;
6 import java.util.concurrent.TimeUnit;
7 import java.util.concurrent.atomic.AtomicLong;
8
9 import com.github.triceo.splitlog.api.CommonFollower;
10 import com.github.triceo.splitlog.api.LogWatch;
11 import com.github.triceo.splitlog.api.Message;
12 import com.github.triceo.splitlog.api.MessageAction;
13 import com.github.triceo.splitlog.api.MessageComparator;
14 import com.github.triceo.splitlog.api.MessageConsumer;
15 import com.github.triceo.splitlog.api.MessageFormatter;
16 import com.github.triceo.splitlog.api.MessageListener;
17 import com.github.triceo.splitlog.api.MessageMeasure;
18 import com.github.triceo.splitlog.api.MessageMetric;
19 import com.github.triceo.splitlog.api.MessageProducer;
20 import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
21 import com.github.triceo.splitlog.api.SimpleMessageCondition;
22 import com.github.triceo.splitlog.conditions.AllFollowerMessagesAcceptingCondition;
23 import com.github.triceo.splitlog.expectations.MidDeliveryExpectationManager;
24 import com.github.triceo.splitlog.ordering.OriginalOrderingMessageComprator;
25
26
27
28
29
30
31
32
33
34
35
36
37 abstract class AbstractCommonFollower<P extends MessageProducer<P>, C extends MessageProducer<C>> implements
38 CommonFollower<P, C>, ConsumerRegistrar<P> {
39
40 private static final MessageComparator DEFAULT_COMPARATOR = OriginalOrderingMessageComprator.INSTANCE;
41 private static final SimpleMessageCondition DEFAULT_CONDITION = AllFollowerMessagesAcceptingCondition.INSTANCE;
42
43 private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
44
45 private final MidDeliveryExpectationManager<C> expectations = new MidDeliveryExpectationManager<C>();
46
47 private final long uniqueId = AbstractCommonFollower.ID_GENERATOR.getAndIncrement();
48
49 @Override
50 public int countConsumers() {
51 return this.getConsumerManager().countConsumers();
52 }
53
54 @Override
55 public int countMetrics() {
56 return this.getConsumerManager().countMetrics();
57 }
58
59 @Override
60 public Future<Message> expect(final MidDeliveryMessageCondition<C> condition) {
61 return this.expectations.setExpectation(condition);
62 }
63
64 @Override
65 public Future<Message> expect(final MidDeliveryMessageCondition<C> condition, final MessageAction<C> action) {
66 return this.expectations.setExpectation(condition, action);
67 }
68
69 protected abstract ConsumerManager<P> getConsumerManager();
70
71
72
73
74
75
76 protected abstract MessageFormatter getDefaultFormatter();
77
78 protected MidDeliveryExpectationManager<C> getExpectationManager() {
79 return this.expectations;
80 }
81
82 @Override
83 public SortedSet<Message> getMessages() {
84 return this.getMessages(AbstractCommonFollower.DEFAULT_COMPARATOR);
85 }
86
87 @Override
88 public SortedSet<Message> getMessages(final MessageComparator order) {
89 return this.getMessages(AbstractCommonFollower.DEFAULT_CONDITION, order);
90 }
91
92 @Override
93 public SortedSet<Message> getMessages(final SimpleMessageCondition condition) {
94 return this.getMessages(condition, AbstractCommonFollower.DEFAULT_COMPARATOR);
95 }
96
97 @Override
98 public MessageMetric<? extends Number, P> getMetric(final String id) {
99 return this.getConsumerManager().getMetric(id);
100 }
101
102 @Override
103 public String getMetricId(final MessageMetric<? extends Number, P> measure) {
104 return this.getConsumerManager().getMetricId(measure);
105 }
106
107 public long getUniqueId() {
108 return this.uniqueId;
109 }
110
111 @Override
112 public boolean isConsuming(final MessageConsumer<P> consumer) {
113 return this.getConsumerManager().isConsuming(consumer);
114 }
115
116 @Override
117 public boolean isMeasuring(final MessageMetric<? extends Number, P> metric) {
118 return this.getConsumerManager().isMeasuring(metric);
119 }
120
121 @Override
122 public boolean isMeasuring(final String id) {
123 return this.getConsumerManager().isMeasuring(id);
124 }
125
126 @Override
127 public boolean registerConsumer(final MessageConsumer<P> consumer) {
128 return this.getConsumerManager().registerConsumer(consumer);
129 }
130
131 @Override
132 public MessageConsumer<P> startConsuming(final MessageListener<P> listener) {
133 return this.getConsumerManager().startConsuming(listener);
134 }
135
136 @Override
137 public <T extends Number> MessageMetric<T, P> startMeasuring(final MessageMeasure<T, P> measure, final String id) {
138 return this.startMeasuring(measure, id, true);
139 }
140
141 protected synchronized <T extends Number> MessageMetric<T, P> startMeasuring(final MessageMeasure<T, P> measure,
142 final String id, final boolean checkIfFollowing) {
143 if (checkIfFollowing && this.isStopped()) {
144 throw new IllegalStateException("Cannot start measurement as the follower is no longer active.");
145 }
146 return this.getConsumerManager().startMeasuring(measure, id);
147 }
148
149 @Override
150 public boolean stopConsuming(final MessageConsumer<P> consumer) {
151 return this.getConsumerManager().stopConsuming(consumer);
152 }
153
154 @Override
155 public boolean stopMeasuring(final MessageMetric<? extends Number, P> metric) {
156 return this.getConsumerManager().stopMeasuring(metric);
157 }
158
159 @Override
160 public boolean stopMeasuring(final String id) {
161 return this.getConsumerManager().stopMeasuring(id);
162 }
163
164 @Override
165 public Message waitFor(final MidDeliveryMessageCondition<C> condition) {
166 try {
167 return this.expect(condition).get();
168 } catch (final Exception e) {
169 return null;
170 }
171 }
172
173 @Override
174 public Message waitFor(final MidDeliveryMessageCondition<C> condition, final long timeout, final TimeUnit unit) {
175 try {
176 return this.expect(condition).get(timeout, unit);
177 } catch (final Exception e) {
178 return null;
179 }
180 }
181
182 @Override
183 public boolean write(final OutputStream stream) {
184 return this.write(stream, this.getDefaultFormatter());
185 }
186
187 @Override
188 public boolean write(final OutputStream stream, final MessageComparator order) {
189 return this.write(stream, order, this.getDefaultFormatter());
190 }
191
192 @Override
193 public boolean write(final OutputStream stream, final MessageComparator order, final MessageFormatter formatter) {
194 return this.write(stream, AbstractCommonFollower.DEFAULT_CONDITION, order, formatter);
195 }
196
197 @Override
198 public boolean write(final OutputStream stream, final MessageFormatter formatter) {
199 return this.write(stream, AbstractCommonFollower.DEFAULT_CONDITION, formatter);
200 }
201
202 @Override
203 public boolean write(final OutputStream stream, final SimpleMessageCondition condition) {
204 return this.write(stream, condition, this.getDefaultFormatter());
205 }
206
207 @Override
208 public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
209 final MessageComparator order) {
210 return this.write(stream, condition, order, this.getDefaultFormatter());
211 }
212
213 @Override
214 public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
215 final MessageFormatter formatter) {
216 return this.write(stream, condition, AbstractCommonFollower.DEFAULT_COMPARATOR, formatter);
217 }
218
219 }