View Javadoc

1   package com.github.triceo.splitlog;
2   
3   import java.io.File;
4   import java.lang.ref.WeakReference;
5   import java.util.ArrayList;
6   import java.util.List;
7   import java.util.concurrent.Future;
8   import java.util.concurrent.TimeUnit;
9   import java.util.concurrent.atomic.AtomicLong;
10  
11  import org.apache.commons.collections4.BidiMap;
12  import org.apache.commons.collections4.bidimap.DualHashBidiMap;
13  import org.apache.commons.lang3.tuple.ImmutablePair;
14  import org.apache.commons.lang3.tuple.Pair;
15  import org.slf4j.Logger;
16  
17  import com.github.triceo.splitlog.api.Follower;
18  import com.github.triceo.splitlog.api.LogWatch;
19  import com.github.triceo.splitlog.api.LogWatchBuilder;
20  import com.github.triceo.splitlog.api.Message;
21  import com.github.triceo.splitlog.api.MessageConsumer;
22  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
23  import com.github.triceo.splitlog.api.MessageListener;
24  import com.github.triceo.splitlog.api.MessageMeasure;
25  import com.github.triceo.splitlog.api.MessageMetric;
26  import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
27  import com.github.triceo.splitlog.api.SimpleMessageCondition;
28  import com.github.triceo.splitlog.api.TailSplitter;
29  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
30  
31  /**
32   * Default log watch implementation which provides all the bells and whistles so
33   * that the rest of the tool can work together.
34   *
35   * The tailer thread will only be started after {@link #startFollowing()} or
36   * {@link #startConsuming(MessageListener)} is called. Subsequently, it will
37   * only be stopped after there no more running {@link Follower}s or
38   * {@link MessageConsumer}s.
39   */
40  final class DefaultLogWatch implements LogWatch {
41  
42      private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
43      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultLogWatch.class);
44  
45      private final ConsumerManager<LogWatch> consumers;
46      private MessageBuilder currentlyProcessedMessage;
47      private final SimpleMessageCondition gateCondition;
48      private final BidiMap<String, MessageMeasure<? extends Number, Follower>> handingDown = new DualHashBidiMap<String, MessageMeasure<? extends Number, Follower>>();
49      private boolean isTerminated = false;
50      private WeakReference<Message> previousAcceptedMessage;
51      private final TailSplitter splitter;
52      private final LogWatchStorageManager storage;
53      private final long uniqueId = DefaultLogWatch.ID_GENERATOR.getAndIncrement();
54      private final File watchedFile;
55  
56      protected DefaultLogWatch(final LogWatchBuilder builder, final TailSplitter splitter) {
57          this.splitter = splitter;
58          this.gateCondition = builder.getGateCondition();
59          this.storage = new LogWatchStorageManager(this, builder.getCapacityLimit(), builder.getStorageCondition());
60          this.consumers = new LogWatchConsumerManager(builder, this.storage, this);
61          this.watchedFile = builder.getFileToWatch();
62      }
63  
64      synchronized void addLine(final String line) {
65          final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
66          if (this.splitter.isStartingLine(line)) {
67              // new message begins
68              if (isMessageBeingProcessed) { // finish old message
69                  final Message completeMessage = this.currentlyProcessedMessage.buildFinal(this.splitter);
70                  final MessageDeliveryStatus accepted = this.handleCompleteMessage(completeMessage);
71                  if (accepted == null) {
72                      DefaultLogWatch.LOGGER.info("Message {} rejected at the gate to {}.", completeMessage, this);
73                  } else if (accepted == MessageDeliveryStatus.ACCEPTED) {
74                      this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
75                  } else {
76                      DefaultLogWatch.LOGGER.info("Message {} rejected from storage in {}.", completeMessage, this);
77                  }
78              }
79              // prepare for new message
80              this.currentlyProcessedMessage = new MessageBuilder(line);
81              if (this.previousAcceptedMessage != null) {
82                  this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
83              }
84          } else {
85              // continue present message
86              if (!isMessageBeingProcessed) {
87                  // most likely just a garbage immediately after start
88                  return;
89              }
90              this.currentlyProcessedMessage.add(line);
91          }
92          this.handleIncomingMessage(this.currentlyProcessedMessage.buildIntermediate(this.splitter));
93      }
94  
95      @Override
96      public int countConsumers() {
97          return this.consumers.countConsumers();
98      }
99  
100     /**
101      * <strong>This is not part of the public API.</strong> Purely for purposes
102      * of testing the automated message sweep.
103      *
104      * @return How many messages there currently are in the internal message
105      *         store.
106      */
107     int countMessagesInStorage() {
108         return this.storage.getMessageStore().size();
109     }
110 
111     @Override
112     public int countMetrics() {
113         return this.consumers.countMetrics();
114     }
115 
116     /**
117      * Return all messages that have been sent to a given {@link Follower}, from
118      * its {@link #startFollowing()} until either its
119      * {@link #stopFollowing(Follower)} or to this moment, whichever is
120      * relevant.
121      *
122      * @param follower
123      *            The follower in question.
124      * @return Unmodifiable list of all the received messages, in the order
125      *         received.
126      */
127     protected List<Message> getAllMessages(final Follower follower) {
128         return this.storage.getAllMessages(follower);
129     }
130 
131     @Override
132     public MessageMetric<? extends Number, LogWatch> getMetric(final String id) {
133         return this.consumers.getMetric(id);
134     }
135 
136     @Override
137     public String getMetricId(final MessageMetric<? extends Number, LogWatch> measure) {
138         return this.consumers.getMetricId(measure);
139     }
140 
141     public long getUniqueId() {
142         return this.uniqueId;
143     }
144 
145     @Override
146     public File getWatchedFile() {
147         return this.watchedFile;
148     }
149 
150     /**
151      * Notify {@link MessageConsumer}s of a message that is either
152      * {@link MessageDeliveryStatus#ACCEPTED} or
153      * {@link MessageDeliveryStatus#REJECTED}.
154      *
155      * @param message
156      *            The message in question.
157      * @return Null if stopped at the gate by
158      *         {@link LogWatchBuilder#getGateCondition()},
159      *         {@link MessageDeliveryStatus#ACCEPTED} if accepted in
160      *         {@link LogWatchBuilder#getStorageCondition()},
161      *         {@link MessageDeliveryStatus#REJECTED} otherwise.
162      */
163     private synchronized MessageDeliveryStatus handleCompleteMessage(final Message message) {
164         if (!this.hasToLetMessageThroughTheGate(message)) {
165             return null;
166         }
167         final boolean messageAccepted = this.storage.registerMessage(message, this);
168         final MessageDeliveryStatus status = messageAccepted ? MessageDeliveryStatus.ACCEPTED
169                 : MessageDeliveryStatus.REJECTED;
170         this.consumers.messageReceived(message, status, this);
171         return status;
172     }
173 
174     /**
175      * Notify {@link MessageConsumer}s of a message that is
176      * {@link MessageDeliveryStatus#INCOMING}.
177      *
178      * @param message
179      *            The message in question.
180      * @return True if the message was passed to {@link MessageConsumer}s, false
181      *         if stopped at the gate by
182      *         {@link LogWatchBuilder#getGateCondition()}.
183      */
184     private synchronized boolean handleIncomingMessage(final Message message) {
185         if (!this.hasToLetMessageThroughTheGate(message)) {
186             return false;
187         }
188         this.consumers.messageReceived(message, MessageDeliveryStatus.INCOMING, this);
189         return true;
190     }
191 
192     /**
193      * Notify {@link Follower} of a message that could not be delivered fully as
194      * the Follower terminated. Will not notify local consumers.
195      *
196      * @param follower
197      *            The follower that was terminated.
198      * @param message
199      *            The message in question.
200      * @return True if the message was passed to the {@link Follower}, false if
201      *         stopped at the gate by {@link LogWatchBuilder#getGateCondition()}
202      *         .
203      */
204     private synchronized boolean handleUndeliveredMessage(final Follower follower, final Message message) {
205         if (!this.hasToLetMessageThroughTheGate(message)) {
206             return false;
207         }
208         // FIXME should inform other consumers? or just metrics on LogWatch?
209         follower.messageReceived(message, MessageDeliveryStatus.INCOMPLETE, this);
210         return true;
211     }
212 
213     private boolean hasToLetMessageThroughTheGate(final Message message) {
214         if (this.gateCondition.accept(message)) {
215             return true;
216         } else {
217             DefaultLogWatch.LOGGER.info("Message '{}' stopped at the gate in {}.", message, this);
218             return false;
219         }
220     }
221 
222     @Override
223     public boolean isConsuming(final MessageConsumer<LogWatch> consumer) {
224         return this.consumers.isConsuming(consumer);
225     }
226 
227     @Override
228     public synchronized boolean isFollowedBy(final Follower follower) {
229         return this.isConsuming(follower);
230     }
231 
232     @Override
233     public boolean isHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
234         return this.handingDown.containsValue(measure);
235     }
236 
237     @Override
238     public boolean isHandingDown(final String id) {
239         return this.handingDown.containsKey(id);
240     }
241 
242     @Override
243     public boolean isMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
244         return this.consumers.isMeasuring(metric);
245     }
246 
247     @Override
248     public boolean isMeasuring(final String id) {
249         return this.consumers.isMeasuring(id);
250     }
251 
252     @Override
253     public synchronized boolean isTerminated() {
254         return this.isTerminated;
255     }
256 
257     @Override
258     public synchronized MessageConsumer<LogWatch> startConsuming(final MessageListener<LogWatch> consumer) {
259         return this.consumers.startConsuming(consumer);
260     }
261 
262     @Override
263     public Follower startFollowing() {
264         return this.startFollowingActually(null).getKey();
265     }
266 
267     @Override
268     public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor) {
269         final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
270         final Follower f = pair.getKey();
271         try {
272             return ImmutablePair.of(f, pair.getValue().get());
273         } catch (final Exception e) {
274             return ImmutablePair.of(f, null);
275         }
276     }
277 
278     @Override
279     public Pair<Follower, Message> startFollowing(final MidDeliveryMessageCondition<LogWatch> waitFor,
280         final long howLong, final TimeUnit unit) {
281         final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
282         final Follower f = pair.getKey();
283         try {
284             return ImmutablePair.of(f, pair.getValue().get(howLong, unit));
285         } catch (final Exception e) {
286             return ImmutablePair.of(f, null);
287         }
288     }
289 
290     /**
291      * @param boolean If the tailer needs a delayed start because of
292      *        {@link #startFollowing(MidDeliveryMessageCondition)}, as explained
293      *        in {@link LogWatchBuilder#getDelayBeforeTailingStarts()}.
294      * @return The follower that follows this log watch from now on.
295      */
296     private synchronized Pair<Follower, Future<Message>> startFollowingActually(
297         final MidDeliveryMessageCondition<LogWatch> condition) {
298         if (this.isTerminated()) {
299             throw new IllegalStateException("Cannot start tailing on an already terminated LogWatch.");
300         }
301         // assemble list of consumers to be handing down and then the follower
302         final List<Pair<String, MessageMeasure<? extends Number, Follower>>> pairs = new ArrayList<Pair<String, MessageMeasure<? extends Number, Follower>>>();
303         for (final BidiMap.Entry<String, MessageMeasure<? extends Number, Follower>> entry : this.handingDown
304                 .entrySet()) {
305             pairs.add(ImmutablePair.<String, MessageMeasure<? extends Number, Follower>> of(entry.getKey(),
306                     entry.getValue()));
307         }
308         // register the follower
309         final Follower follower = new DefaultFollower(this, pairs);
310         final Future<Message> expectation = condition == null ? null : follower.expect(condition);
311         this.consumers.registerConsumer(follower);
312         this.storage.followerStarted(follower);
313         DefaultLogWatch.LOGGER.info("Registered {} for {}.", follower, this);
314         return ImmutablePair.of(follower, expectation);
315     }
316 
317     @Override
318     public Pair<Follower, Future<Message>> startFollowingWithExpectation(
319         final MidDeliveryMessageCondition<LogWatch> waitFor) {
320         final Pair<Follower, Future<Message>> pair = this.startFollowingActually(waitFor);
321         return ImmutablePair.of(pair.getKey(), pair.getValue());
322     }
323 
324     @Override
325     public synchronized boolean startHandingDown(final MessageMeasure<? extends Number, Follower> measure,
326         final String id) {
327         if (this.isTerminated()) {
328             throw new IllegalStateException("Log watch already terminated.");
329         } else if (measure == null) {
330             throw new IllegalArgumentException("Measure may not be null.");
331         } else if (id == null) {
332             throw new IllegalArgumentException("ID may not be null.");
333         } else if (this.handingDown.containsKey(id) || this.handingDown.containsValue(measure)) {
334             return false;
335         }
336         this.handingDown.put(id, measure);
337         return true;
338     }
339 
340     @Override
341     public <T extends Number> MessageMetric<T, LogWatch> startMeasuring(final MessageMeasure<T, LogWatch> measure,
342         final String id) {
343         return this.consumers.startMeasuring(measure, id);
344     }
345 
346     @Override
347     public synchronized boolean stopConsuming(final MessageConsumer<LogWatch> consumer) {
348         final boolean result = this.consumers.stopConsuming(consumer);
349         if (this.countConsumers() < 1) {
350             this.currentlyProcessedMessage = null;
351         }
352         return result;
353     }
354 
355     @Override
356     public synchronized boolean stopFollowing(final Follower follower) {
357         if (!this.isFollowedBy(follower)) {
358             return false;
359         }
360         if (this.currentlyProcessedMessage != null) {
361             this.handleUndeliveredMessage(follower, this.currentlyProcessedMessage.buildIntermediate(this.splitter));
362         }
363         this.stopConsuming(follower);
364         this.storage.followerTerminated(follower);
365         DefaultLogWatch.LOGGER.info("Unregistered {} for {}.", follower, this);
366         return true;
367     }
368 
369     @Override
370     public synchronized boolean stopHandingDown(final MessageMeasure<? extends Number, Follower> measure) {
371         return (this.handingDown.removeValue(measure) != null);
372     }
373 
374     @Override
375     public synchronized boolean stopHandingDown(final String id) {
376         return (this.handingDown.remove(id) != null);
377     }
378 
379     @Override
380     public boolean stopMeasuring(final MessageMetric<? extends Number, LogWatch> metric) {
381         return this.consumers.stopMeasuring(metric);
382     }
383 
384     @Override
385     public boolean stopMeasuring(final String id) {
386         return this.consumers.stopMeasuring(id);
387     }
388 
389     /**
390      * Invoking this method will cause the running
391      * {@link LogWatchStorageSweeper} to be de-scheduled. Any currently present
392      * {@link Message}s will only be removed from memory when this watch
393      * instance is removed from memory.
394      */
395     @Override
396     public synchronized boolean terminate() {
397         if (this.isTerminated()) {
398             return false;
399         }
400         DefaultLogWatch.LOGGER.info("Terminating {}.", this);
401         this.isTerminated = true;
402         this.consumers.stop();
403         this.handingDown.clear();
404         this.previousAcceptedMessage = null;
405         DefaultLogWatch.LOGGER.info("Terminated {}.", this);
406         return true;
407     }
408 
409     @Override
410     public String toString() {
411         final StringBuilder builder = new StringBuilder();
412         builder.append("DefaultLogWatch [getUniqueId()=").append(this.getUniqueId()).append(", ");
413         if (this.getWatchedFile() != null) {
414             builder.append("getWatchedFile()=").append(this.getWatchedFile()).append(", ");
415         }
416         builder.append("isTerminated()=").append(this.isTerminated()).append("]");
417         return builder.toString();
418     }
419 
420 }