1 package com.github.triceo.splitlog;
2
3 import java.io.File;
4 import java.lang.ref.WeakReference;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.concurrent.Future;
8 import java.util.concurrent.TimeUnit;
9 import java.util.concurrent.atomic.AtomicLong;
10
11 import org.apache.commons.collections4.BidiMap;
12 import org.apache.commons.collections4.bidimap.DualHashBidiMap;
13 import org.apache.commons.lang3.tuple.ImmutablePair;
14 import org.apache.commons.lang3.tuple.Pair;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 import com.github.triceo.splitlog.api.Follower;
19 import com.github.triceo.splitlog.api.LogWatch;
20 import com.github.triceo.splitlog.api.LogWatchBuilder;
21 import com.github.triceo.splitlog.api.Message;
22 import com.github.triceo.splitlog.api.MessageConsumer;
23 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
24 import com.github.triceo.splitlog.api.MessageListener;
25 import com.github.triceo.splitlog.api.MessageMeasure;
26 import com.github.triceo.splitlog.api.MessageMetric;
27 import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
28 import com.github.triceo.splitlog.api.SimpleMessageCondition;
29 import com.github.triceo.splitlog.api.TailSplitter;
30 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
31
32
33
34
35
36
37
38
39
40
41 final class DefaultLogWatch implements LogWatch {
42
43 private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
44
45 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultLogWatch.class);
46 static {
47
48
49
50
51 final Logger l = LoggerFactory.getLogger(DefaultLogWatch.class);
52 if (SplitlogLoggerFactory.isLoggingEnabled()) {
53 l.info("Splitlog's internal logging system can be disabled by setting '{}' system property to '{}'.",
54 SplitlogLoggerFactory.LOGGING_PROPERTY_NAME, SplitlogLoggerFactory.OFF_STATE);
55 } else {
56 l.warn("This will be the last message from Splitlog, unless you enable Splitlog's internal logging system by setting '{}' system property to '{}'.",
57 SplitlogLoggerFactory.LOGGING_PROPERTY_NAME, SplitlogLoggerFactory.ON_STATE);
58 }
59 }
60
61 private final ConsumerManager<LogWatch> consumers = new ConsumerManager<LogWatch>(this);
62 private MessageBuilder currentlyProcessedMessage;
63 private final SimpleMessageCondition gateCondition;
64 private final BidiMap<String, MessageMeasure<? extends Number, Follower>> handingDown = new DualHashBidiMap<String, MessageMeasure<? extends Number, Follower>>();
65 private boolean isTerminated = false;
66 private WeakReference<Message> previousAcceptedMessage;
67 private final TailSplitter splitter;
68 private final LogWatchStorageManager storage;
69 private final LogWatchSweepingManager sweeping;
70 private final LogWatchTailingManager tailing;
71 private final long uniqueId = DefaultLogWatch.ID_GENERATOR.getAndIncrement();
72 private final File watchedFile;
73
74 protected DefaultLogWatch(final File watchedFile, final TailSplitter splitter, final int capacity,
75 final SimpleMessageCondition gateCondition, final SimpleMessageCondition acceptanceCondition,
76 final long delayBetweenReads, final long delayBetweenSweeps, final boolean ignoreExistingContent,
77 final boolean reopenBetweenReads, final int bufferSize) {
78 this.splitter = splitter;
79 this.gateCondition = gateCondition;
80 this.storage = new LogWatchStorageManager(this, capacity, acceptanceCondition);
81 this.tailing = new LogWatchTailingManager(this, delayBetweenReads, ignoreExistingContent, reopenBetweenReads,
82 bufferSize);
83 this.sweeping = new LogWatchSweepingManager(this.storage, delayBetweenSweeps);
84 this.watchedFile = watchedFile;
85 }
86
87 synchronized void addLine(final String line) {
88 final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
89 if (this.splitter.isStartingLine(line)) {
90
91 if (isMessageBeingProcessed) {
92 final Message completeMessage = this.currentlyProcessedMessage.buildFinal(this.splitter);
93 final MessageDeliveryStatus accepted = this.handleCompleteMessage(completeMessage);
94 if (accepted == null) {
95 DefaultLogWatch.LOGGER.info("Message {} rejected at the gate to {}.", completeMessage, this);
96 } else if (accepted == MessageDeliveryStatus.ACCEPTED) {
97 this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
98 } else {
99 DefaultLogWatch.LOGGER.info("Message {} rejected from storage in {}.", completeMessage, this);
100 }
101 }
102
103 this.currentlyProcessedMessage = new MessageBuilder(line);
104 if (this.previousAcceptedMessage != null) {
105 this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
106 }
107 } else {
108
109 if (!isMessageBeingProcessed) {
110
111 return;
112 }
113 this.currentlyProcessedMessage.add(line);
114 }
115 this.handleIncomingMessage(this.currentlyProcessedMessage.buildIntermediate(this.splitter));
116 }
117
118 @Override
119 public int countConsumers() {
120 return this.consumers.countConsumers();
121 }
122
123
124
125
126
127
128
129
130 int countMessagesInStorage() {
131 return this.storage.getMessageStore().size();
132 }
133
134 @Override
135 public int countMetrics() {
136 return this.consumers.countMetrics();
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150 protected List<Message> getAllMessages(final Follower follower) {
151 return this.storage.getAllMessages(follower);
152 }
153
154 @Override
155 public MessageMetric<? extends Number, LogWatch> getMetric(final String id) {
156 return this.consumers.getMetric(id);
157 }
158
159 @Override
160 public String getMetricId(final MessageMetric<? extends Number, LogWatch> measure) {
161 return this.consumers.getMetricId(measure);
162 }
163
164 public long getUniqueId() {
165 return this.uniqueId;
166 }
167
168 @Override
169 public File getWatchedFile() {
170 return this.watchedFile;
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186 private synchronized MessageDeliveryStatus handleCompleteMessage(final Message message) {
187 if (!this.hasToLetMessageThroughTheGate(message)) {
188 return null;
189 }
190 final boolean messageAccepted = this.storage.registerMessage(message, this);
191 final MessageDeliveryStatus status = messageAccepted ? MessageDeliveryStatus.ACCEPTED
192 : MessageDeliveryStatus.REJECTED;
193 this.consumers.messageReceived(message, status, this);
194 return status;
195 }
196
197
198
199
200
201
202
203
204
205
206
207 private synchronized boolean handleIncomingMessage(final Message message) {
208 if (!this.hasToLetMessageThroughTheGate(message)) {
209 return false;
210 }
211 this.consumers.messageReceived(message, MessageDeliveryStatus.INCOMING, this);
212 return true;
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227 private synchronized boolean handleUndeliveredMessage(final Follower follower, final Message message) {
228 if (!this.hasToLetMessageThroughTheGate(message)) {
229 return false;
230 }
231
232 follower.messageReceived(message, MessageDeliveryStatus.INCOMPLETE, this);
233 return true;
234 }
235
236 private boolean hasToLetMessageThroughTheGate(final Message message) {
237 if (this.gateCondition.accept(message)) {
238 return true;
239 } else {
240 DefaultLogWatch.LOGGER.info("Message '{}' stopped at the gate in {}.", message, this);
241 return false;
242 }
243 }
244
245 @Override
246 public boolean isConsuming(final MessageConsumer<LogWatch> consumer) {
247 return this.consumers.isConsuming(consumer);
248 }
249
250 @Override
251 public synchronized boolean isFollowedBy(final Follower follower) {
252 return this.isConsuming(follower);
253 }
254
255 @Override
256 public boolean isHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
257 return this.handingDown.containsValue(measure);
258 }
259
260 @Override
261 public boolean isHandingDown(final String id) {
262 return this.handingDown.containsKey(id);
263 }
264
265 @Override
266 public boolean isMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
267 return this.consumers.isMeasuring(metric);
268 }
269
270 @Override
271 public boolean isMeasuring(final String id) {
272 return this.consumers.isMeasuring(id);
273 }
274
275 @Override
276 public synchronized boolean isTerminated() {
277 return this.isTerminated;
278 }
279
280 @Override
281 public synchronized MessageConsumer<LogWatch> startConsuming(final MessageListener<LogWatch> consumer) {
282 final MessageConsumer<LogWatch> result = this.consumers.startConsuming(consumer);
283 this.tailing.start();
284 return result;
285 }
286
287 @Override
288 public Follower startFollowing() {
289 return this.startFollowingActually(null).getKey();
290 }
291
292 @Override
293 public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor) {
294 final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
295 final Follower f = pair.getKey();
296 try {
297 return ImmutablePair.of(f, pair.getValue().get());
298 } catch (final Exception e) {
299 return ImmutablePair.of(f, null);
300 }
301 }
302
303 @Override
304 public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor,
305 final long howLong, final TimeUnit unit) {
306 final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
307 final Follower f = pair.getKey();
308 try {
309 return ImmutablePair.of(f, pair.getValue().get(howLong, unit));
310 } catch (final Exception e) {
311 return ImmutablePair.of(f, null);
312 }
313 }
314
315
316
317
318
319
320
321 private synchronized Pair<Follower, Future<Message>> startFollowingActually(
322 final MidDeliveryMessageCondition<LogWatch> condition) {
323 if (this.isTerminated()) {
324 throw new IllegalStateException("Cannot start tailing on an already terminated LogWatch.");
325 }
326 this.sweeping.start();
327
328 final List<Pair<String, MessageMeasure<? extends Number, Follower>>> pairs = new ArrayList<Pair<String, MessageMeasure<? extends Number, Follower>>>();
329 for (final BidiMap.Entry<String, MessageMeasure<? extends Number, Follower>> entry : this.handingDown
330 .entrySet()) {
331 pairs.add(ImmutablePair.<String, MessageMeasure<? extends Number, Follower>> of(entry.getKey(),
332 entry.getValue()));
333 }
334
335 final Follower follower = new DefaultFollower(this, pairs);
336 final Future<Message> expectation = condition == null ? null : follower.expect(condition);
337 this.consumers.registerConsumer(follower);
338 this.storage.followerStarted(follower);
339 DefaultLogWatch.LOGGER.info("Registered {} for {}.", follower, this);
340 this.tailing.start();
341 return ImmutablePair.of(follower, expectation);
342 }
343
344 @Override
345 public Pair<Follower, Future<Message>> startFollowingWithExpectation(
346 final MidDeliveryMessageCondition<LogWatch> waitFor) {
347 final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
348 return ImmutablePair.of(pair.getKey(), pair.getValue());
349 }
350
351 @Override
352 public synchronized boolean startHandingDown(final MessageMeasure<? extends Number, Follower> measure,
353 final String id) {
354 if (this.isTerminated()) {
355 throw new IllegalStateException("Log watch already terminated.");
356 } else if (measure == null) {
357 throw new IllegalArgumentException("Measure may not be null.");
358 } else if (id == null) {
359 throw new IllegalArgumentException("ID may not be null.");
360 } else if (this.handingDown.containsKey(id) || this.handingDown.containsValue(measure)) {
361 return false;
362 }
363 this.handingDown.put(id, measure);
364 return true;
365 }
366
367 @Override
368 public <T extends Number> MessageMetric<T, LogWatch> startMeasuring(final MessageMeasure<T, LogWatch> measure,
369 final String id) {
370 return this.consumers.startMeasuring(measure, id);
371 }
372
373 @Override
374 public synchronized boolean stopConsuming(final MessageConsumer<LogWatch> consumer) {
375 final boolean result = this.consumers.stopConsuming(consumer);
376 if (this.countConsumers() < 1) {
377 DefaultLogWatch.LOGGER.info("Requested stopping the tailer for {}.", this);
378 this.tailing.stop();
379 this.currentlyProcessedMessage = null;
380 }
381 return result;
382 }
383
384 @Override
385 public synchronized boolean stopFollowing(final Follower follower) {
386 if (!this.isFollowedBy(follower)) {
387 return false;
388 }
389 if (this.currentlyProcessedMessage != null) {
390 this.handleUndeliveredMessage(follower, this.currentlyProcessedMessage.buildIntermediate(this.splitter));
391 }
392 this.stopConsuming(follower);
393 this.storage.followerTerminated(follower);
394 DefaultLogWatch.LOGGER.info("Unregistered {} for {}.", follower, this);
395 return true;
396 }
397
398 @Override
399 public synchronized boolean stopHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
400 return (this.handingDown.removeValue(measure) != null);
401 }
402
403 @Override
404 public synchronized boolean stopHandingDown(final String id) {
405 return (this.handingDown.remove(id) != null);
406 }
407
408 @Override
409 public boolean stopMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
410 return this.consumers.stopMeasuring(metric);
411 }
412
413 @Override
414 public boolean stopMeasuring(final String id) {
415 return this.consumers.stopMeasuring(id);
416 }
417
418
419
420
421
422
423
424 @Override
425 public synchronized boolean terminate() {
426 if (this.isTerminated()) {
427 return false;
428 }
429 DefaultLogWatch.LOGGER.info("Terminating {}.", this);
430 this.isTerminated = true;
431 this.consumers.stop();
432 this.tailing.stop();
433 this.handingDown.clear();
434 this.sweeping.stop();
435 this.previousAcceptedMessage = null;
436 DefaultLogWatch.LOGGER.info("Terminated {}.", this);
437 return true;
438 }
439
440 @Override
441 public String toString() {
442 final StringBuilder builder = new StringBuilder();
443 builder.append("DefaultLogWatch [getUniqueId()=").append(this.getUniqueId()).append(", ");
444 if (this.getWatchedFile() != null) {
445 builder.append("getWatchedFile()=").append(this.getWatchedFile()).append(", ");
446 }
447 builder.append("isTerminated()=").append(this.isTerminated()).append("]");
448 return builder.toString();
449 }
450
451 }