View Javadoc

1   package com.github.triceo.splitlog;
2   
3   import java.io.File;
4   import java.lang.ref.WeakReference;
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.concurrent.Future;
8   import java.util.concurrent.TimeUnit;
9   import java.util.concurrent.atomic.AtomicLong;
10  
11  import org.apache.commons.collections4.BidiMap;
12  import org.apache.commons.collections4.bidimap.DualHashBidiMap;
13  import org.apache.commons.lang3.tuple.ImmutablePair;
14  import org.apache.commons.lang3.tuple.Pair;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import com.github.triceo.splitlog.api.Follower;
19  import com.github.triceo.splitlog.api.LogWatch;
20  import com.github.triceo.splitlog.api.LogWatchBuilder;
21  import com.github.triceo.splitlog.api.Message;
22  import com.github.triceo.splitlog.api.MessageConsumer;
23  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
24  import com.github.triceo.splitlog.api.MessageListener;
25  import com.github.triceo.splitlog.api.MessageMeasure;
26  import com.github.triceo.splitlog.api.MessageMetric;
27  import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
28  import com.github.triceo.splitlog.api.SimpleMessageCondition;
29  import com.github.triceo.splitlog.api.TailSplitter;
30  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
31  
32  /**
33   * Default log watch implementation which provides all the bells and whistles so
34   * that the rest of the tool can work together.
35   *
36   * The tailer thread will only be started after {@link #startFollowing()} or
37   * {@link #startConsuming(MessageListener)} is called. Subsequently, it will
38   * only be stopped after there no more running {@link Follower}s or
39   * {@link MessageConsumer}s.
40   */
41  final class DefaultLogWatch implements LogWatch {
42  
43      private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
44  
45      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultLogWatch.class);
46      static {
47          /*
48           * intentionally using the original logger so that this message can not
49           * be silenced
50           */
51          final Logger l = LoggerFactory.getLogger(DefaultLogWatch.class);
52          if (SplitlogLoggerFactory.isLoggingEnabled()) {
53              l.info("Splitlog's internal logging system can be disabled by setting '{}' system property to '{}'.",
54                      SplitlogLoggerFactory.LOGGING_PROPERTY_NAME, SplitlogLoggerFactory.OFF_STATE);
55          } else {
56              l.warn("This will be the last message from Splitlog, unless you enable Splitlog's internal logging system by setting '{}' system property to '{}'.",
57                      SplitlogLoggerFactory.LOGGING_PROPERTY_NAME, SplitlogLoggerFactory.ON_STATE);
58          }
59      }
60  
61      private final ConsumerManager<LogWatch> consumers = new ConsumerManager<LogWatch>(this);
62      private MessageBuilder currentlyProcessedMessage;
63      private final SimpleMessageCondition gateCondition;
64      private final BidiMap<String, MessageMeasure<? extends Number, Follower>> handingDown = new DualHashBidiMap<String, MessageMeasure<? extends Number, Follower>>();
65      private boolean isTerminated = false;
66      private WeakReference<Message> previousAcceptedMessage;
67      private final TailSplitter splitter;
68      private final LogWatchStorageManager storage;
69      private final LogWatchSweepingManager sweeping;
70      private final LogWatchTailingManager tailing;
71      private final long uniqueId = DefaultLogWatch.ID_GENERATOR.getAndIncrement();
72      private final File watchedFile;
73  
74      protected DefaultLogWatch(final File watchedFile, final TailSplitter splitter, final int capacity,
75          final SimpleMessageCondition gateCondition, final SimpleMessageCondition acceptanceCondition,
76          final long delayBetweenReads, final long delayBetweenSweeps, final boolean ignoreExistingContent,
77          final boolean reopenBetweenReads, final int bufferSize) {
78          this.splitter = splitter;
79          this.gateCondition = gateCondition;
80          this.storage = new LogWatchStorageManager(this, capacity, acceptanceCondition);
81          this.tailing = new LogWatchTailingManager(this, delayBetweenReads, ignoreExistingContent, reopenBetweenReads,
82                  bufferSize);
83          this.sweeping = new LogWatchSweepingManager(this.storage, delayBetweenSweeps);
84          this.watchedFile = watchedFile;
85      }
86  
87      synchronized void addLine(final String line) {
88          final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
89          if (this.splitter.isStartingLine(line)) {
90              // new message begins
91              if (isMessageBeingProcessed) { // finish old message
92                  final Message completeMessage = this.currentlyProcessedMessage.buildFinal(this.splitter);
93                  final MessageDeliveryStatus accepted = this.handleCompleteMessage(completeMessage);
94                  if (accepted == null) {
95                      DefaultLogWatch.LOGGER.info("Message {} rejected at the gate to {}.", completeMessage, this);
96                  } else if (accepted == MessageDeliveryStatus.ACCEPTED) {
97                      this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
98                  } else {
99                      DefaultLogWatch.LOGGER.info("Message {} rejected from storage in {}.", completeMessage, this);
100                 }
101             }
102             // prepare for new message
103             this.currentlyProcessedMessage = new MessageBuilder(line);
104             if (this.previousAcceptedMessage != null) {
105                 this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
106             }
107         } else {
108             // continue present message
109             if (!isMessageBeingProcessed) {
110                 // most likely just a garbage immediately after start
111                 return;
112             }
113             this.currentlyProcessedMessage.add(line);
114         }
115         this.handleIncomingMessage(this.currentlyProcessedMessage.buildIntermediate(this.splitter));
116     }
117 
118     @Override
119     public int countConsumers() {
120         return this.consumers.countConsumers();
121     }
122 
123     /**
124      * <strong>This is not part of the public API.</strong> Purely for purposes
125      * of testing the automated message sweep.
126      *
127      * @return How many messages there currently are in the internal message
128      *         store.
129      */
130     int countMessagesInStorage() {
131         return this.storage.getMessageStore().size();
132     }
133 
134     @Override
135     public int countMetrics() {
136         return this.consumers.countMetrics();
137     }
138 
139     /**
140      * Return all messages that have been sent to a given {@link Follower}, from
141      * its {@link #startFollowing()} until either its
142      * {@link #stopFollowing(Follower)} or to this moment, whichever is
143      * relevant.
144      *
145      * @param follower
146      *            The follower in question.
147      * @return Unmodifiable list of all the received messages, in the order
148      *         received.
149      */
150     protected List<Message> getAllMessages(final Follower follower) {
151         return this.storage.getAllMessages(follower);
152     }
153 
154     @Override
155     public MessageMetric<? extends Number, LogWatch> getMetric(final String id) {
156         return this.consumers.getMetric(id);
157     }
158 
159     @Override
160     public String getMetricId(final MessageMetric<? extends Number, LogWatch> measure) {
161         return this.consumers.getMetricId(measure);
162     }
163 
164     public long getUniqueId() {
165         return this.uniqueId;
166     }
167 
168     @Override
169     public File getWatchedFile() {
170         return this.watchedFile;
171     }
172 
173     /**
174      * Notify {@link MessageConsumer}s of a message that is either
175      * {@link MessageDeliveryStatus#ACCEPTED} or
176      * {@link MessageDeliveryStatus#REJECTED}.
177      *
178      * @param message
179      *            The message in question.
180      * @return Null if stopped at the gate by
181      *         {@link LogWatchBuilder#getGateCondition()},
182      *         {@link MessageDeliveryStatus#ACCEPTED} if accepted in
183      *         {@link LogWatchBuilder#getStorageCondition()},
184      *         {@link MessageDeliveryStatus#REJECTED} otherwise.
185      */
186     private synchronized MessageDeliveryStatus handleCompleteMessage(final Message message) {
187         if (!this.hasToLetMessageThroughTheGate(message)) {
188             return null;
189         }
190         final boolean messageAccepted = this.storage.registerMessage(message, this);
191         final MessageDeliveryStatus status = messageAccepted ? MessageDeliveryStatus.ACCEPTED
192                 : MessageDeliveryStatus.REJECTED;
193         this.consumers.messageReceived(message, status, this);
194         return status;
195     }
196 
197     /**
198      * Notify {@link MessageConsumer}s of a message that is
199      * {@link MessageDeliveryStatus#INCOMING}.
200      *
201      * @param message
202      *            The message in question.
203      * @return True if the message was passed to {@link MessageConsumer}s, false
204      *         if stopped at the gate by
205      *         {@link LogWatchBuilder#getGateCondition()}.
206      */
207     private synchronized boolean handleIncomingMessage(final Message message) {
208         if (!this.hasToLetMessageThroughTheGate(message)) {
209             return false;
210         }
211         this.consumers.messageReceived(message, MessageDeliveryStatus.INCOMING, this);
212         return true;
213     }
214 
215     /**
216      * Notify {@link Follower} of a message that could not be delivered fully as
217      * the Follower terminated. Will not notify local consumers.
218      *
219      * @param follower
220      *            The follower that was terminated.
221      * @param message
222      *            The message in question.
223      * @return True if the message was passed to the {@link Follower}, false if
224      *         stopped at the gate by {@link LogWatchBuilder#getGateCondition()}
225      *         .
226      */
227     private synchronized boolean handleUndeliveredMessage(final Follower follower, final Message message) {
228         if (!this.hasToLetMessageThroughTheGate(message)) {
229             return false;
230         }
231         // FIXME should inform other consumers? or just metrics on LogWatch?
232         follower.messageReceived(message, MessageDeliveryStatus.INCOMPLETE, this);
233         return true;
234     }
235 
236     private boolean hasToLetMessageThroughTheGate(final Message message) {
237         if (this.gateCondition.accept(message)) {
238             return true;
239         } else {
240             DefaultLogWatch.LOGGER.info("Message '{}' stopped at the gate in {}.", message, this);
241             return false;
242         }
243     }
244 
245     @Override
246     public boolean isConsuming(final MessageConsumer<LogWatch> consumer) {
247         return this.consumers.isConsuming(consumer);
248     }
249 
250     @Override
251     public synchronized boolean isFollowedBy(final Follower follower) {
252         return this.isConsuming(follower);
253     }
254 
255     @Override
256     public boolean isHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
257         return this.handingDown.containsValue(measure);
258     }
259 
260     @Override
261     public boolean isHandingDown(final String id) {
262         return this.handingDown.containsKey(id);
263     }
264 
265     @Override
266     public boolean isMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
267         return this.consumers.isMeasuring(metric);
268     }
269 
270     @Override
271     public boolean isMeasuring(final String id) {
272         return this.consumers.isMeasuring(id);
273     }
274 
275     @Override
276     public synchronized boolean isTerminated() {
277         return this.isTerminated;
278     }
279 
280     @Override
281     public synchronized MessageConsumer<LogWatch> startConsuming(final MessageListener<LogWatch> consumer) {
282         final MessageConsumer<LogWatch> result = this.consumers.startConsuming(consumer);
283         this.tailing.start();
284         return result;
285     }
286 
287     @Override
288     public Follower startFollowing() {
289         return this.startFollowingActually(null).getKey();
290     }
291 
292     @Override
293     public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor) {
294         final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
295         final Follower f = pair.getKey();
296         try {
297             return ImmutablePair.of(f, pair.getValue().get());
298         } catch (final Exception e) {
299             return ImmutablePair.of(f, null);
300         }
301     }
302 
303     @Override
304     public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor,
305             final long howLong, final TimeUnit unit) {
306         final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
307         final Follower f = pair.getKey();
308         try {
309             return ImmutablePair.of(f, pair.getValue().get(howLong, unit));
310         } catch (final Exception e) {
311             return ImmutablePair.of(f, null);
312         }
313     }
314 
315     /**
316      * @param boolean If the tailer needs a delayed start because of
317      *        {@link #startFollowing(MidDeliveryMessageCondition)}, as explained
318      *        in {@link LogWatchBuilder#getDelayBeforeTailingStarts()}.
319      * @return The follower that follows this log watch from now on.
320      */
321     private synchronized Pair<Follower, Future<Message>> startFollowingActually(
322             final MidDeliveryMessageCondition<LogWatch> condition) {
323         if (this.isTerminated()) {
324             throw new IllegalStateException("Cannot start tailing on an already terminated LogWatch.");
325         }
326         this.sweeping.start();
327         // assemble list of consumers to be handing down and then the follower
328         final List<Pair<String, MessageMeasure<? extends Number, Follower>>> pairs = new ArrayList<Pair<String, MessageMeasure<? extends Number, Follower>>>();
329         for (final BidiMap.Entry<String, MessageMeasure<? extends Number, Follower>> entry : this.handingDown
330                 .entrySet()) {
331             pairs.add(ImmutablePair.<String, MessageMeasure<? extends Number, Follower>> of(entry.getKey(),
332                     entry.getValue()));
333         }
334         // register the follower
335         final Follower follower = new DefaultFollower(this, pairs);
336         final Future<Message> expectation = condition == null ? null : follower.expect(condition);
337         this.consumers.registerConsumer(follower);
338         this.storage.followerStarted(follower);
339         DefaultLogWatch.LOGGER.info("Registered {} for {}.", follower, this);
340         this.tailing.start();
341         return ImmutablePair.of(follower, expectation);
342     }
343 
344     @Override
345     public Pair<Follower, Future<Message>> startFollowingWithExpectation(
346             final MidDeliveryMessageCondition<LogWatch> waitFor) {
347         final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
348         return ImmutablePair.of(pair.getKey(), pair.getValue());
349     }
350 
351     @Override
352     public synchronized boolean startHandingDown(final MessageMeasure<? extends Number, Follower> measure,
353         final String id) {
354         if (this.isTerminated()) {
355             throw new IllegalStateException("Log watch already terminated.");
356         } else if (measure == null) {
357             throw new IllegalArgumentException("Measure may not be null.");
358         } else if (id == null) {
359             throw new IllegalArgumentException("ID may not be null.");
360         } else if (this.handingDown.containsKey(id) || this.handingDown.containsValue(measure)) {
361             return false;
362         }
363         this.handingDown.put(id, measure);
364         return true;
365     }
366 
367     @Override
368     public <T extends Number> MessageMetric<T, LogWatch> startMeasuring(final MessageMeasure<T, LogWatch> measure,
369             final String id) {
370         return this.consumers.startMeasuring(measure, id);
371     }
372 
373     @Override
374     public synchronized boolean stopConsuming(final MessageConsumer<LogWatch> consumer) {
375         final boolean result = this.consumers.stopConsuming(consumer);
376         if (this.countConsumers() < 1) {
377             DefaultLogWatch.LOGGER.info("Requested stopping the tailer for {}.", this);
378             this.tailing.stop();
379             this.currentlyProcessedMessage = null;
380         }
381         return result;
382     }
383 
384     @Override
385     public synchronized boolean stopFollowing(final Follower follower) {
386         if (!this.isFollowedBy(follower)) {
387             return false;
388         }
389         if (this.currentlyProcessedMessage != null) {
390             this.handleUndeliveredMessage(follower, this.currentlyProcessedMessage.buildIntermediate(this.splitter));
391         }
392         this.stopConsuming(follower);
393         this.storage.followerTerminated(follower);
394         DefaultLogWatch.LOGGER.info("Unregistered {} for {}.", follower, this);
395         return true;
396     }
397 
398     @Override
399     public synchronized boolean stopHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
400         return (this.handingDown.removeValue(measure) != null);
401     }
402 
403     @Override
404     public synchronized boolean stopHandingDown(final String id) {
405         return (this.handingDown.remove(id) != null);
406     }
407 
408     @Override
409     public boolean stopMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
410         return this.consumers.stopMeasuring(metric);
411     }
412 
413     @Override
414     public boolean stopMeasuring(final String id) {
415         return this.consumers.stopMeasuring(id);
416     }
417 
418     /**
419      * Invoking this method will cause the running
420      * {@link LogWatchStorageSweeper} to be de-scheduled. Any currently present
421      * {@link Message}s will only be removed from memory when this watch
422      * instance is removed from memory.
423      */
424     @Override
425     public synchronized boolean terminate() {
426         if (this.isTerminated()) {
427             return false;
428         }
429         DefaultLogWatch.LOGGER.info("Terminating {}.", this);
430         this.isTerminated = true;
431         this.consumers.stop();
432         this.tailing.stop();
433         this.handingDown.clear();
434         this.sweeping.stop();
435         this.previousAcceptedMessage = null;
436         DefaultLogWatch.LOGGER.info("Terminated {}.", this);
437         return true;
438     }
439 
440     @Override
441     public String toString() {
442         final StringBuilder builder = new StringBuilder();
443         builder.append("DefaultLogWatch [getUniqueId()=").append(this.getUniqueId()).append(", ");
444         if (this.getWatchedFile() != null) {
445             builder.append("getWatchedFile()=").append(this.getWatchedFile()).append(", ");
446         }
447         builder.append("isTerminated()=").append(this.isTerminated()).append("]");
448         return builder.toString();
449     }
450 
451 }