1 package com.github.triceo.splitlog;
2
3 import java.io.File;
4 import java.lang.ref.WeakReference;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.concurrent.TimeUnit;
8 import java.util.concurrent.atomic.AtomicLong;
9
10 import org.apache.commons.collections4.BidiMap;
11 import org.apache.commons.collections4.bidimap.DualHashBidiMap;
12 import org.apache.commons.lang3.tuple.ImmutablePair;
13 import org.apache.commons.lang3.tuple.Pair;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 import com.github.triceo.splitlog.api.Follower;
18 import com.github.triceo.splitlog.api.LogWatch;
19 import com.github.triceo.splitlog.api.Message;
20 import com.github.triceo.splitlog.api.MessageConsumer;
21 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
22 import com.github.triceo.splitlog.api.MessageListener;
23 import com.github.triceo.splitlog.api.MessageMeasure;
24 import com.github.triceo.splitlog.api.MessageMetric;
25 import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
26 import com.github.triceo.splitlog.api.SimpleMessageCondition;
27 import com.github.triceo.splitlog.api.TailSplitter;
28
29
30
31
32
33
34
35
36
37
38 final class DefaultLogWatch implements LogWatch {
39
40 private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
41 private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLogWatch.class);
42
43 private final MeasuringConsumerManager<LogWatch> consumers = new MeasuringConsumerManager<LogWatch>(this);
44 private MessageBuilder currentlyProcessedMessage;
45 private final BidiMap<String, MessageMeasure<? extends Number, Follower>> handingDown = new DualHashBidiMap<String, MessageMeasure<? extends Number, Follower>>();
46 private boolean isTerminated = false;
47 private WeakReference<Message> previousAcceptedMessage;
48 private final TailSplitter splitter;
49 private final LogWatchStorageManager storage;
50 private final LogWatchSweepingManager sweeping;
51 private final LogWatchTailingManager tailing;
52 private final long uniqueId = DefaultLogWatch.ID_GENERATOR.getAndIncrement();
53 private final File watchedFile;
54
55 protected DefaultLogWatch(final File watchedFile, final TailSplitter splitter, final int capacity,
56 final SimpleMessageCondition acceptanceCondition, final long delayBetweenReads,
57 final long delayBetweenSweeps, final boolean ignoreExistingContent, final boolean reopenBetweenReads,
58 final int bufferSize, final long delayForTailerStart) {
59 this.splitter = splitter;
60 this.storage = new LogWatchStorageManager(this, capacity, acceptanceCondition);
61 this.tailing = new LogWatchTailingManager(this, delayBetweenReads, delayForTailerStart, ignoreExistingContent,
62 reopenBetweenReads, bufferSize);
63 this.sweeping = new LogWatchSweepingManager(this.storage, delayBetweenSweeps);
64 this.watchedFile = watchedFile;
65 }
66
67 synchronized void addLine(final String line) {
68 final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
69 if (this.splitter.isStartingLine(line)) {
70
71 if (isMessageBeingProcessed) {
72 final Message completeMessage = this.handleCompleteMessage(this.currentlyProcessedMessage);
73 if (completeMessage != null) {
74 this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
75 }
76 }
77
78 this.currentlyProcessedMessage = new MessageBuilder(line);
79 if (this.previousAcceptedMessage != null) {
80 this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
81 }
82 } else {
83
84 if (!isMessageBeingProcessed) {
85
86 return;
87 }
88 this.currentlyProcessedMessage.add(line);
89 }
90 this.handleIncomingMessage(this.currentlyProcessedMessage);
91 }
92
93 @Override
94 public int countConsumers() {
95 return this.consumers.countConsumers();
96 }
97
98
99
100
101
102
103
104
105 int countMessagesInStorage() {
106 return this.storage.getMessageStore().size();
107 }
108
109 @Override
110 public int countMetrics() {
111 return this.consumers.countMetrics();
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125 protected List<Message> getAllMessages(final Follower follower) {
126 return this.storage.getAllMessages(follower);
127 }
128
129 @Override
130 public MessageMetric<? extends Number, LogWatch> getMetric(final String id) {
131 return this.consumers.getMetric(id);
132 }
133
134 @Override
135 public String getMetricId(final MessageMetric<? extends Number, LogWatch> measure) {
136 return this.consumers.getMetricId(measure);
137 }
138
139 public long getUniqueId() {
140 return this.uniqueId;
141 }
142
143 @Override
144 public File getWatchedFile() {
145 return this.watchedFile;
146 }
147
148
149
150
151
152
153
154
155
156
157
158 private synchronized Message handleCompleteMessage(final MessageBuilder messageBuilder) {
159 final Message message = messageBuilder.buildFinal(this.splitter);
160 final boolean messageAccepted = this.storage.registerMessage(message, this);
161 final MessageDeliveryStatus status = messageAccepted ? MessageDeliveryStatus.ACCEPTED
162 : MessageDeliveryStatus.REJECTED;
163 this.consumers.messageReceived(message, status, this);
164 return messageAccepted ? message : null;
165 }
166
167
168
169
170
171
172
173
174
175 private synchronized Message handleIncomingMessage(final MessageBuilder messageBuilder) {
176 final Message message = messageBuilder.buildIntermediate(this.splitter);
177 this.consumers.messageReceived(message, MessageDeliveryStatus.INCOMING, this);
178 return message;
179 }
180
181
182
183
184
185
186
187
188
189
190
191 private synchronized Message handleUndeliveredMessage(final AbstractLogWatchFollower follower,
192 final MessageBuilder messageBuilder) {
193 final Message message = messageBuilder.buildIntermediate(this.splitter);
194 follower.messageReceived(message, MessageDeliveryStatus.INCOMPLETE, this);
195 return message;
196 }
197
198 @Override
199 public boolean isConsuming(final MessageConsumer<LogWatch> consumer) {
200 return this.consumers.isConsuming(consumer);
201 }
202
203 @Override
204 public synchronized boolean isFollowedBy(final Follower follower) {
205 return this.isConsuming(follower);
206 }
207
208 @Override
209 public boolean isHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
210 return this.handingDown.containsValue(measure);
211 }
212
213 @Override
214 public boolean isHandingDown(final String id) {
215 return this.handingDown.containsKey(id);
216 }
217
218 @Override
219 public boolean isMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
220 return this.consumers.isMeasuring(metric);
221 }
222
223 @Override
224 public boolean isMeasuring(final String id) {
225 return this.consumers.isMeasuring(id);
226 }
227
228 @Override
229 public synchronized boolean isTerminated() {
230 return this.isTerminated;
231 }
232
233 @Override
234 public synchronized MessageConsumer<LogWatch> startConsuming(final MessageListener<LogWatch> consumer) {
235 final MessageConsumer<LogWatch> result = this.consumers.startConsuming(consumer);
236 this.startTailingIfNecessary(false);
237 return result;
238 }
239
240 @Override
241 public synchronized Follower startFollowing() {
242 final Follower f = this.startFollowingActually(false);
243 this.tailing.waitUntilStarted();
244 return f;
245 }
246
247 @Override
248 public synchronized Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor) {
249 final Follower f = this.startFollowingActually(true);
250 return ImmutablePair.of(f, f.waitFor(waitFor));
251 }
252
253 @Override
254 public synchronized Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor,
255 final long howLong, final TimeUnit unit) {
256 final Follower f = this.startFollowingActually(true);
257 return ImmutablePair.of(f, f.waitFor(waitFor, howLong, unit));
258 }
259
260
261
262
263
264
265
266 private synchronized Follower startFollowingActually(final boolean needsToWait) {
267 if (this.isTerminated()) {
268 throw new IllegalStateException("Cannot start tailing on an already terminated LogWatch.");
269 }
270 this.sweeping.start();
271
272 final List<Pair<String, MessageMeasure<? extends Number, Follower>>> pairs = new ArrayList<Pair<String, MessageMeasure<? extends Number, Follower>>>();
273 for (final BidiMap.Entry<String, MessageMeasure<? extends Number, Follower>> entry : this.handingDown
274 .entrySet()) {
275 pairs.add(ImmutablePair.<String, MessageMeasure<? extends Number, Follower>> of(entry.getKey(),
276 entry.getValue()));
277 }
278
279 final AbstractLogWatchFollower follower = new NonStoringFollower(this, pairs);
280 this.consumers.registerConsumer(follower);
281 this.storage.followerStarted(follower);
282 DefaultLogWatch.LOGGER.info("Registered {} for {}.", follower, this);
283 this.startTailingIfNecessary(needsToWait);
284 return follower;
285 }
286
287 @Override
288 public synchronized boolean startHandingDown(final MessageMeasure<? extends Number, Follower> measure,
289 final String id) {
290 if (this.isTerminated()) {
291 throw new IllegalStateException("Log watch already terminated.");
292 } else if (measure == null) {
293 throw new IllegalArgumentException("Measure may not be null.");
294 } else if (id == null) {
295 throw new IllegalArgumentException("ID may not be null.");
296 } else if (this.handingDown.containsKey(id) || this.handingDown.containsValue(measure)) {
297 return false;
298 }
299 this.handingDown.put(id, measure);
300 return true;
301 }
302
303 @Override
304 public <T extends Number> MessageMetric<T, LogWatch> startMeasuring(final MessageMeasure<T, LogWatch> measure,
305 final String id) {
306 return this.consumers.startMeasuring(measure, id);
307 }
308
309 private synchronized void startTailingIfNecessary(final boolean needsToWait) {
310 if (this.tailing.isRunning()) {
311 return;
312 }
313 if (this.consumers.countConsumers() > 0) {
314
315 this.tailing.start(needsToWait);
316 }
317 }
318
319 @Override
320 public synchronized boolean stopConsuming(final MessageConsumer<LogWatch> consumer) {
321 final boolean result = this.consumers.stopConsuming(consumer);
322 this.stopTailingIfNecessary();
323 return result;
324 }
325
326 @Override
327 public synchronized boolean stopFollowing(final Follower follower) {
328 if (!this.isFollowedBy(follower)) {
329 return false;
330 }
331 if (this.currentlyProcessedMessage != null) {
332 this.handleUndeliveredMessage((AbstractLogWatchFollower) follower, this.currentlyProcessedMessage);
333 }
334 this.consumers.stopConsuming(follower);
335 this.storage.followerTerminated(follower);
336 DefaultLogWatch.LOGGER.info("Unregistered {} for {}.", follower, this);
337 this.stopTailingIfNecessary();
338 return true;
339 }
340
341 @Override
342 public synchronized boolean stopHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
343 return (this.handingDown.removeValue(measure) != null);
344 }
345
346 @Override
347 public synchronized boolean stopHandingDown(final String id) {
348 return (this.handingDown.remove(id) != null);
349 }
350
351 @Override
352 public boolean stopMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
353 return this.consumers.stopMeasuring(metric);
354 }
355
356 @Override
357 public boolean stopMeasuring(final String id) {
358 return this.consumers.stopMeasuring(id);
359 }
360
361 private synchronized void stopTailingIfNecessary() {
362 if (!this.tailing.isRunning()) {
363 return;
364 }
365 if (this.consumers.countConsumers() == 0) {
366 this.tailing.stop();
367 this.currentlyProcessedMessage = null;
368 }
369 }
370
371
372
373
374
375
376
377 @Override
378 public synchronized boolean terminate() {
379 if (this.isTerminated()) {
380 return false;
381 }
382 DefaultLogWatch.LOGGER.info("Terminating {}.", this);
383 this.isTerminated = true;
384 this.consumers.stop();
385 this.handingDown.clear();
386 this.sweeping.stop();
387 this.previousAcceptedMessage = null;
388 DefaultLogWatch.LOGGER.info("Terminated {}.", this);
389 return true;
390 }
391
392 @Override
393 public String toString() {
394 final StringBuilder builder = new StringBuilder();
395 builder.append("DefaultLogWatch [getUniqueId()=").append(this.getUniqueId()).append(", ");
396 if (this.getWatchedFile() != null) {
397 builder.append("getWatchedFile()=").append(this.getWatchedFile()).append(", ");
398 }
399 builder.append("isTerminated()=").append(this.isTerminated()).append("]");
400 return builder.toString();
401 }
402
403 }