View Javadoc

1   package com.github.triceo.splitlog;
2   
3   import java.io.File;
4   import java.lang.ref.WeakReference;
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.concurrent.TimeUnit;
8   import java.util.concurrent.atomic.AtomicLong;
9   
10  import org.apache.commons.collections4.BidiMap;
11  import org.apache.commons.collections4.bidimap.DualHashBidiMap;
12  import org.apache.commons.lang3.tuple.ImmutablePair;
13  import org.apache.commons.lang3.tuple.Pair;
14  import org.slf4j.Logger;
15  import org.slf4j.LoggerFactory;
16  
17  import com.github.triceo.splitlog.api.Follower;
18  import com.github.triceo.splitlog.api.LogWatch;
19  import com.github.triceo.splitlog.api.Message;
20  import com.github.triceo.splitlog.api.MessageConsumer;
21  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
22  import com.github.triceo.splitlog.api.MessageListener;
23  import com.github.triceo.splitlog.api.MessageMeasure;
24  import com.github.triceo.splitlog.api.MessageMetric;
25  import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
26  import com.github.triceo.splitlog.api.SimpleMessageCondition;
27  import com.github.triceo.splitlog.api.TailSplitter;
28  
29  /**
30   * Default log watch implementation which provides all the bells and whistles so
31   * that the rest of the tool can work together.
32   *
33   * The tailer thread will only be started after {@link #startFollowing()} or
34   * {@link #startConsuming(MessageListener)} is called. Subsequently, it will
35   * only be stopped after there no more running {@link Follower}s or
36   * {@link MessageConsumer}s.
37   */
38  final class DefaultLogWatch implements LogWatch {
39  
40      private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
41      private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLogWatch.class);
42  
43      private final MeasuringConsumerManager<LogWatch> consumers = new MeasuringConsumerManager<LogWatch>(this);
44      private MessageBuilder currentlyProcessedMessage;
45      private final BidiMap<String, MessageMeasure<? extends Number, Follower>> handingDown = new DualHashBidiMap<String, MessageMeasure<? extends Number, Follower>>();
46      private boolean isTerminated = false;
47      private WeakReference<Message> previousAcceptedMessage;
48      private final TailSplitter splitter;
49      private final LogWatchStorageManager storage;
50      private final LogWatchSweepingManager sweeping;
51      private final LogWatchTailingManager tailing;
52      private final long uniqueId = DefaultLogWatch.ID_GENERATOR.getAndIncrement();
53      private final File watchedFile;
54  
55      protected DefaultLogWatch(final File watchedFile, final TailSplitter splitter, final int capacity,
56          final SimpleMessageCondition acceptanceCondition, final long delayBetweenReads,
57          final long delayBetweenSweeps, final boolean ignoreExistingContent, final boolean reopenBetweenReads,
58          final int bufferSize, final long delayForTailerStart) {
59          this.splitter = splitter;
60          this.storage = new LogWatchStorageManager(this, capacity, acceptanceCondition);
61          this.tailing = new LogWatchTailingManager(this, delayBetweenReads, delayForTailerStart, ignoreExistingContent,
62                  reopenBetweenReads, bufferSize);
63          this.sweeping = new LogWatchSweepingManager(this.storage, delayBetweenSweeps);
64          this.watchedFile = watchedFile;
65      }
66  
67      synchronized void addLine(final String line) {
68          final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
69          if (this.splitter.isStartingLine(line)) {
70              // new message begins
71              if (isMessageBeingProcessed) { // finish old message
72                  final Message completeMessage = this.handleCompleteMessage(this.currentlyProcessedMessage);
73                  if (completeMessage != null) {
74                      this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
75                  }
76              }
77              // prepare for new message
78              this.currentlyProcessedMessage = new MessageBuilder(line);
79              if (this.previousAcceptedMessage != null) {
80                  this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
81              }
82          } else {
83              // continue present message
84              if (!isMessageBeingProcessed) {
85                  // most likely just a garbage immediately after start
86                  return;
87              }
88              this.currentlyProcessedMessage.add(line);
89          }
90          this.handleIncomingMessage(this.currentlyProcessedMessage);
91      }
92  
93      @Override
94      public int countConsumers() {
95          return this.consumers.countConsumers();
96      }
97  
98      /**
99       * <strong>This is not part of the public API.</strong> Purely for purposes
100      * of testing the automated message sweep.
101      *
102      * @return How many messages there currently are in the internal message
103      *         store.
104      */
105     int countMessagesInStorage() {
106         return this.storage.getMessageStore().size();
107     }
108 
109     @Override
110     public int countMetrics() {
111         return this.consumers.countMetrics();
112     }
113 
114     /**
115      * Return all messages that have been sent to a given {@link Follower}, from
116      * its {@link #startFollowing()} until either its
117      * {@link #stopFollowing(Follower)} or to this moment, whichever is
118      * relevant.
119      *
120      * @param follower
121      *            The follower in question.
122      * @return Unmodifiable list of all the received messages, in the order
123      *         received.
124      */
125     protected List<Message> getAllMessages(final Follower follower) {
126         return this.storage.getAllMessages(follower);
127     }
128 
129     @Override
130     public MessageMetric<? extends Number, LogWatch> getMetric(final String id) {
131         return this.consumers.getMetric(id);
132     }
133 
134     @Override
135     public String getMetricId(final MessageMetric<? extends Number, LogWatch> measure) {
136         return this.consumers.getMetricId(measure);
137     }
138 
139     public long getUniqueId() {
140         return this.uniqueId;
141     }
142 
143     @Override
144     public File getWatchedFile() {
145         return this.watchedFile;
146     }
147 
148     /**
149      * Notify {@link MessageConsumer}s of a message that is either
150      * {@link MessageDeliveryStatus#ACCEPTED} or
151      * {@link MessageDeliveryStatus#REJECTED}.
152      *
153      * @param messageBuilder
154      *            Builder to use to construct the message.
155      * @return The message that was the subject of notifications; null if
156      *         rejected.
157      */
158     private synchronized Message handleCompleteMessage(final MessageBuilder messageBuilder) {
159         final Message message = messageBuilder.buildFinal(this.splitter);
160         final boolean messageAccepted = this.storage.registerMessage(message, this);
161         final MessageDeliveryStatus status = messageAccepted ? MessageDeliveryStatus.ACCEPTED
162                 : MessageDeliveryStatus.REJECTED;
163         this.consumers.messageReceived(message, status, this);
164         return messageAccepted ? message : null;
165     }
166 
167     /**
168      * Notify {@link MessageConsumer}s of a message that is
169      * {@link MessageDeliveryStatus#INCOMING}.
170      *
171      * @param messageBuilder
172      *            Builder to use to construct the message.
173      * @return The message that was the subject of notifications.
174      */
175     private synchronized Message handleIncomingMessage(final MessageBuilder messageBuilder) {
176         final Message message = messageBuilder.buildIntermediate(this.splitter);
177         this.consumers.messageReceived(message, MessageDeliveryStatus.INCOMING, this);
178         return message;
179     }
180 
181     /**
182      * Notify {@link Follower} of a message that could not be delivered fully as
183      * the Follower terminated. Will not notify local consumers.
184      *
185      * @param follower
186      *            The follower that was terminated.
187      * @param messageBuilder
188      *            Builder to use to construct the message.
189      * @return The message that was the subject of notifications.
190      */
191     private synchronized Message handleUndeliveredMessage(final AbstractLogWatchFollower follower,
192         final MessageBuilder messageBuilder) {
193         final Message message = messageBuilder.buildIntermediate(this.splitter);
194         follower.messageReceived(message, MessageDeliveryStatus.INCOMPLETE, this);
195         return message;
196     }
197 
198     @Override
199     public boolean isConsuming(final MessageConsumer<LogWatch> consumer) {
200         return this.consumers.isConsuming(consumer);
201     }
202 
203     @Override
204     public synchronized boolean isFollowedBy(final Follower follower) {
205         return this.isConsuming(follower);
206     }
207 
208     @Override
209     public boolean isHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
210         return this.handingDown.containsValue(measure);
211     }
212 
213     @Override
214     public boolean isHandingDown(final String id) {
215         return this.handingDown.containsKey(id);
216     }
217 
218     @Override
219     public boolean isMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
220         return this.consumers.isMeasuring(metric);
221     }
222 
223     @Override
224     public boolean isMeasuring(final String id) {
225         return this.consumers.isMeasuring(id);
226     }
227 
228     @Override
229     public synchronized boolean isTerminated() {
230         return this.isTerminated;
231     }
232 
233     @Override
234     public synchronized MessageConsumer<LogWatch> startConsuming(final MessageListener<LogWatch> consumer) {
235         final MessageConsumer<LogWatch> result = this.consumers.startConsuming(consumer);
236         this.startTailingIfNecessary(false);
237         return result;
238     }
239 
240     @Override
241     public synchronized Follower startFollowing() {
242         final Follower f = this.startFollowingActually(false);
243         this.tailing.waitUntilStarted();
244         return f;
245     }
246 
247     @Override
248     public synchronized Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor) {
249         final Follower f = this.startFollowingActually(true);
250         return ImmutablePair.of(f, f.waitFor(waitFor));
251     }
252 
253     @Override
254     public synchronized Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor,
255             final long howLong, final TimeUnit unit) {
256         final Follower f = this.startFollowingActually(true);
257         return ImmutablePair.of(f, f.waitFor(waitFor, howLong, unit));
258     }
259 
260     /**
261      * @param boolean If the tailer needs a delayed start because of
262      *        {@link #startFollowing(MidDeliveryMessageCondition)}, as explained
263      *        in {@link LogWatchBuilder#getDelayBeforeTailingStarts()}.
264      * @return The follower that follows this log watch from now on.
265      */
266     private synchronized Follower startFollowingActually(final boolean needsToWait) {
267         if (this.isTerminated()) {
268             throw new IllegalStateException("Cannot start tailing on an already terminated LogWatch.");
269         }
270         this.sweeping.start();
271         // assemble list of consumers to be handing down and then the follower
272         final List<Pair<String, MessageMeasure<? extends Number, Follower>>> pairs = new ArrayList<Pair<String, MessageMeasure<? extends Number, Follower>>>();
273         for (final BidiMap.Entry<String, MessageMeasure<? extends Number, Follower>> entry : this.handingDown
274                 .entrySet()) {
275             pairs.add(ImmutablePair.<String, MessageMeasure<? extends Number, Follower>> of(entry.getKey(),
276                     entry.getValue()));
277         }
278         // register the follower
279         final AbstractLogWatchFollower follower = new NonStoringFollower(this, pairs);
280         this.consumers.registerConsumer(follower);
281         this.storage.followerStarted(follower);
282         DefaultLogWatch.LOGGER.info("Registered {} for {}.", follower, this);
283         this.startTailingIfNecessary(needsToWait);
284         return follower;
285     }
286 
287     @Override
288     public synchronized boolean startHandingDown(final MessageMeasure<? extends Number, Follower> measure,
289         final String id) {
290         if (this.isTerminated()) {
291             throw new IllegalStateException("Log watch already terminated.");
292         } else if (measure == null) {
293             throw new IllegalArgumentException("Measure may not be null.");
294         } else if (id == null) {
295             throw new IllegalArgumentException("ID may not be null.");
296         } else if (this.handingDown.containsKey(id) || this.handingDown.containsValue(measure)) {
297             return false;
298         }
299         this.handingDown.put(id, measure);
300         return true;
301     }
302 
303     @Override
304     public <T extends Number> MessageMetric<T, LogWatch> startMeasuring(final MessageMeasure<T, LogWatch> measure,
305             final String id) {
306         return this.consumers.startMeasuring(measure, id);
307     }
308 
309     private synchronized void startTailingIfNecessary(final boolean needsToWait) {
310         if (this.tailing.isRunning()) {
311             return;
312         }
313         if (this.consumers.countConsumers() > 0) {
314             // we have listeners, let's start tailing
315             this.tailing.start(needsToWait);
316         }
317     }
318 
319     @Override
320     public synchronized boolean stopConsuming(final MessageConsumer<LogWatch> consumer) {
321         final boolean result = this.consumers.stopConsuming(consumer);
322         this.stopTailingIfNecessary();
323         return result;
324     }
325 
326     @Override
327     public synchronized boolean stopFollowing(final Follower follower) {
328         if (!this.isFollowedBy(follower)) {
329             return false;
330         }
331         if (this.currentlyProcessedMessage != null) {
332             this.handleUndeliveredMessage((AbstractLogWatchFollower) follower, this.currentlyProcessedMessage);
333         }
334         this.consumers.stopConsuming(follower);
335         this.storage.followerTerminated(follower);
336         DefaultLogWatch.LOGGER.info("Unregistered {} for {}.", follower, this);
337         this.stopTailingIfNecessary();
338         return true;
339     }
340 
341     @Override
342     public synchronized boolean stopHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
343         return (this.handingDown.removeValue(measure) != null);
344     }
345 
346     @Override
347     public synchronized boolean stopHandingDown(final String id) {
348         return (this.handingDown.remove(id) != null);
349     }
350 
351     @Override
352     public boolean stopMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
353         return this.consumers.stopMeasuring(metric);
354     }
355 
356     @Override
357     public boolean stopMeasuring(final String id) {
358         return this.consumers.stopMeasuring(id);
359     }
360 
361     private synchronized void stopTailingIfNecessary() {
362         if (!this.tailing.isRunning()) {
363             return;
364         }
365         if (this.consumers.countConsumers() == 0) {
366             this.tailing.stop();
367             this.currentlyProcessedMessage = null;
368         }
369     }
370 
371     /**
372      * Invoking this method will cause the running
373      * {@link LogWatchStorageSweeper} to be de-scheduled. Any currently present
374      * {@link Message}s will only be removed from memory when this watch
375      * instance is removed from memory.
376      */
377     @Override
378     public synchronized boolean terminate() {
379         if (this.isTerminated()) {
380             return false;
381         }
382         DefaultLogWatch.LOGGER.info("Terminating {}.", this);
383         this.isTerminated = true;
384         this.consumers.stop();
385         this.handingDown.clear();
386         this.sweeping.stop();
387         this.previousAcceptedMessage = null;
388         DefaultLogWatch.LOGGER.info("Terminated {}.", this);
389         return true;
390     }
391 
392     @Override
393     public String toString() {
394         final StringBuilder builder = new StringBuilder();
395         builder.append("DefaultLogWatch [getUniqueId()=").append(this.getUniqueId()).append(", ");
396         if (this.getWatchedFile() != null) {
397             builder.append("getWatchedFile()=").append(this.getWatchedFile()).append(", ");
398         }
399         builder.append("isTerminated()=").append(this.isTerminated()).append("]");
400         return builder.toString();
401     }
402 
403 }