1 package com.github.triceo.splitlog;
2
3 import java.io.File;
4 import java.lang.ref.WeakReference;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.concurrent.Future;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicLong;
10
11 import org.apache.commons.collections4.BidiMap;
12 import org.apache.commons.collections4.bidimap.DualHashBidiMap;
13 import org.apache.commons.lang3.tuple.ImmutablePair;
14 import org.apache.commons.lang3.tuple.Pair;
15 import org.slf4j.Logger;
16
17 import com.github.triceo.splitlog.api.Follower;
18 import com.github.triceo.splitlog.api.LogWatch;
19 import com.github.triceo.splitlog.api.LogWatchBuilder;
20 import com.github.triceo.splitlog.api.Message;
21 import com.github.triceo.splitlog.api.MessageConsumer;
22 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
23 import com.github.triceo.splitlog.api.MessageListener;
24 import com.github.triceo.splitlog.api.MessageMeasure;
25 import com.github.triceo.splitlog.api.MessageMetric;
26 import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
27 import com.github.triceo.splitlog.api.SimpleMessageCondition;
28 import com.github.triceo.splitlog.api.TailSplitter;
29 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
30
31
32
33
34
35
36
37
38
39
40 final class DefaultLogWatch implements LogWatch {
41
42 private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
43 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultLogWatch.class);
44
45 private final ConsumerManager<LogWatch> consumers;
46 private MessageBuilder currentlyProcessedMessage;
47 private final SimpleMessageCondition gateCondition;
48 private final BidiMap<String, MessageMeasure<? extends Number, Follower>> handingDown = new DualHashBidiMap<String, MessageMeasure<? extends Number, Follower>>();
49 private boolean isTerminated = false;
50 private WeakReference<Message> previousAcceptedMessage;
51 private final TailSplitter splitter;
52 private final LogWatchStorageManager storage;
53 private final long uniqueId = DefaultLogWatch.ID_GENERATOR.getAndIncrement();
54 private final File watchedFile;
55
56 protected DefaultLogWatch(final LogWatchBuilder builder, final TailSplitter splitter) {
57 this.splitter = splitter;
58 this.gateCondition = builder.getGateCondition();
59 this.storage = new LogWatchStorageManager(this, builder.getCapacityLimit(), builder.getStorageCondition());
60 this.consumers = new LogWatchConsumerManager(builder, this.storage, this);
61 this.watchedFile = builder.getFileToWatch();
62 }
63
64 synchronized void addLine(final String line) {
65 final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
66 if (this.splitter.isStartingLine(line)) {
67
68 if (isMessageBeingProcessed) {
69 final Message completeMessage = this.currentlyProcessedMessage.buildFinal(this.splitter);
70 final MessageDeliveryStatus accepted = this.handleCompleteMessage(completeMessage);
71 if (accepted == null) {
72 DefaultLogWatch.LOGGER.info("Message {} rejected at the gate to {}.", completeMessage, this);
73 } else if (accepted == MessageDeliveryStatus.ACCEPTED) {
74 this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
75 } else {
76 DefaultLogWatch.LOGGER.info("Message {} rejected from storage in {}.", completeMessage, this);
77 }
78 }
79
80 this.currentlyProcessedMessage = new MessageBuilder(line);
81 if (this.previousAcceptedMessage != null) {
82 this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
83 }
84 } else {
85
86 if (!isMessageBeingProcessed) {
87
88 return;
89 }
90 this.currentlyProcessedMessage.add(line);
91 }
92 this.handleIncomingMessage(this.currentlyProcessedMessage.buildIntermediate(this.splitter));
93 }
94
95 @Override
96 public int countConsumers() {
97 return this.consumers.countConsumers();
98 }
99
100
101
102
103
104
105
106
107 int countMessagesInStorage() {
108 return this.storage.getMessageStore().size();
109 }
110
111 @Override
112 public int countMetrics() {
113 return this.consumers.countMetrics();
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127 protected List<Message> getAllMessages(final Follower follower) {
128 return this.storage.getAllMessages(follower);
129 }
130
131 @Override
132 public MessageMetric<? extends Number, LogWatch> getMetric(final String id) {
133 return this.consumers.getMetric(id);
134 }
135
136 @Override
137 public String getMetricId(final MessageMetric<? extends Number, LogWatch> measure) {
138 return this.consumers.getMetricId(measure);
139 }
140
141 public long getUniqueId() {
142 return this.uniqueId;
143 }
144
145 @Override
146 public File getWatchedFile() {
147 return this.watchedFile;
148 }
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 private synchronized MessageDeliveryStatus handleCompleteMessage(final Message message) {
164 if (!this.hasToLetMessageThroughTheGate(message)) {
165 return null;
166 }
167 final boolean messageAccepted = this.storage.registerMessage(message, this);
168 final MessageDeliveryStatus status = messageAccepted ? MessageDeliveryStatus.ACCEPTED
169 : MessageDeliveryStatus.REJECTED;
170 this.consumers.messageReceived(message, status, this);
171 return status;
172 }
173
174
175
176
177
178
179
180
181
182
183
184 private synchronized boolean handleIncomingMessage(final Message message) {
185 if (!this.hasToLetMessageThroughTheGate(message)) {
186 return false;
187 }
188 this.consumers.messageReceived(message, MessageDeliveryStatus.INCOMING, this);
189 return true;
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204 private synchronized boolean handleUndeliveredMessage(final Follower follower, final Message message) {
205 if (!this.hasToLetMessageThroughTheGate(message)) {
206 return false;
207 }
208
209 follower.messageReceived(message, MessageDeliveryStatus.INCOMPLETE, this);
210 return true;
211 }
212
213 private boolean hasToLetMessageThroughTheGate(final Message message) {
214 if (this.gateCondition.accept(message)) {
215 return true;
216 } else {
217 DefaultLogWatch.LOGGER.info("Message '{}' stopped at the gate in {}.", message, this);
218 return false;
219 }
220 }
221
222 @Override
223 public boolean isConsuming(final MessageConsumer<LogWatch> consumer) {
224 return this.consumers.isConsuming(consumer);
225 }
226
227 @Override
228 public synchronized boolean isFollowedBy(final Follower follower) {
229 return this.isConsuming(follower);
230 }
231
232 @Override
233 public boolean isHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
234 return this.handingDown.containsValue(measure);
235 }
236
237 @Override
238 public boolean isHandingDown(final String id) {
239 return this.handingDown.containsKey(id);
240 }
241
242 @Override
243 public boolean isMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
244 return this.consumers.isMeasuring(metric);
245 }
246
247 @Override
248 public boolean isMeasuring(final String id) {
249 return this.consumers.isMeasuring(id);
250 }
251
252 @Override
253 public synchronized boolean isTerminated() {
254 return this.isTerminated;
255 }
256
257 @Override
258 public synchronized MessageConsumer<LogWatch> startConsuming(final MessageListener<LogWatch> consumer) {
259 return this.consumers.startConsuming(consumer);
260 }
261
262 @Override
263 public Follower startFollowing() {
264 return this.startFollowingActually(null).getKey();
265 }
266
267 @Override
268 public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor) {
269 final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
270 final Follower f = pair.getKey();
271 try {
272 return ImmutablePair.of(f, pair.getValue().get());
273 } catch (final Exception e) {
274 return ImmutablePair.of(f, null);
275 }
276 }
277
278 @Override
279 public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor,
280 final long howLong, final TimeUnit unit) {
281 final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
282 final Follower f = pair.getKey();
283 try {
284 return ImmutablePair.of(f, pair.getValue().get(howLong, unit));
285 } catch (final Exception e) {
286 return ImmutablePair.of(f, null);
287 }
288 }
289
290
291
292
293
294
295
296 private synchronized Pair<Follower, Future<Message>> startFollowingActually(
297 final MidDeliveryMessageCondition<LogWatch> condition) {
298 if (this.isTerminated()) {
299 throw new IllegalStateException("Cannot start tailing on an already terminated LogWatch.");
300 }
301
302 final List<Pair<String, MessageMeasure<? extends Number, Follower>>> pairs = new ArrayList<Pair<String, MessageMeasure<? extends Number, Follower>>>();
303 for (final BidiMap.Entry<String, MessageMeasure<? extends Number, Follower>> entry : this.handingDown
304 .entrySet()) {
305 pairs.add(ImmutablePair.<String, MessageMeasure<? extends Number, Follower>> of(entry.getKey(),
306 entry.getValue()));
307 }
308
309 final Follower follower = new DefaultFollower(this, pairs);
310 final Future<Message> expectation = condition == null ? null : follower.expect(condition);
311 this.consumers.registerConsumer(follower);
312 this.storage.followerStarted(follower);
313 DefaultLogWatch.LOGGER.info("Registered {} for {}.", follower, this);
314 return ImmutablePair.of(follower, expectation);
315 }
316
317 @Override
318 public Pair<Follower, Future<Message>> startFollowingWithExpectation(
319 final MidDeliveryMessageCondition<LogWatch> waitFor) {
320 final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
321 return ImmutablePair.of(pair.getKey(), pair.getValue());
322 }
323
324 @Override
325 public synchronized boolean startHandingDown(final MessageMeasure<? extends Number, Follower> measure,
326 final String id) {
327 if (this.isTerminated()) {
328 throw new IllegalStateException("Log watch already terminated.");
329 } else if (measure == null) {
330 throw new IllegalArgumentException("Measure may not be null.");
331 } else if (id == null) {
332 throw new IllegalArgumentException("ID may not be null.");
333 } else if (this.handingDown.containsKey(id) || this.handingDown.containsValue(measure)) {
334 return false;
335 }
336 this.handingDown.put(id, measure);
337 return true;
338 }
339
340 @Override
341 public <T extends Number> MessageMetric<T, LogWatch> startMeasuring(final MessageMeasure<T, LogWatch> measure,
342 final String id) {
343 return this.consumers.startMeasuring(measure, id);
344 }
345
346 @Override
347 public synchronized boolean stopConsuming(final MessageConsumer<LogWatch> consumer) {
348 final boolean result = this.consumers.stopConsuming(consumer);
349 if (this.countConsumers() < 1) {
350 this.currentlyProcessedMessage = null;
351 }
352 return result;
353 }
354
355 @Override
356 public synchronized boolean stopFollowing(final Follower follower) {
357 if (!this.isFollowedBy(follower)) {
358 return false;
359 }
360 if (this.currentlyProcessedMessage != null) {
361 this.handleUndeliveredMessage(follower, this.currentlyProcessedMessage.buildIntermediate(this.splitter));
362 }
363 this.stopConsuming(follower);
364 this.storage.followerTerminated(follower);
365 DefaultLogWatch.LOGGER.info("Unregistered {} for {}.", follower, this);
366 return true;
367 }
368
369 @Override
370 public synchronized boolean stopHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
371 return (this.handingDown.removeValue(measure) != null);
372 }
373
374 @Override
375 public synchronized boolean stopHandingDown(final String id) {
376 return (this.handingDown.remove(id) != null);
377 }
378
379 @Override
380 public boolean stopMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
381 return this.consumers.stopMeasuring(metric);
382 }
383
384 @Override
385 public boolean stopMeasuring(final String id) {
386 return this.consumers.stopMeasuring(id);
387 }
388
389
390
391
392
393
394
395 @Override
396 public synchronized boolean terminate() {
397 if (this.isTerminated()) {
398 return false;
399 }
400 DefaultLogWatch.LOGGER.info("Terminating {}.", this);
401 this.isTerminated = true;
402 this.consumers.stop();
403 this.handingDown.clear();
404 this.previousAcceptedMessage = null;
405 DefaultLogWatch.LOGGER.info("Terminated {}.", this);
406 return true;
407 }
408
409 @Override
410 public String toString() {
411 final StringBuilder builder = new StringBuilder();
412 builder.append("DefaultLogWatch [getUniqueId()=").append(this.getUniqueId()).append(", ");
413 if (this.getWatchedFile() != null) {
414 builder.append("getWatchedFile()=").append(this.getWatchedFile()).append(", ");
415 }
416 builder.append("isTerminated()=").append(this.isTerminated()).append("]");
417 return builder.toString();
418 }
419
420 }