View Javadoc
1   package com.github.triceo.splitlog;
2   
3   import java.lang.ref.WeakReference;
4   import java.util.concurrent.ExecutorService;
5   import java.util.concurrent.Executors;
6   import java.util.concurrent.TimeUnit;
7   import java.util.concurrent.atomic.AtomicBoolean;
8   import java.util.concurrent.atomic.AtomicLong;
9   
10  import org.apache.commons.io.input.Tailer;
11  import org.slf4j.Logger;
12  
13  import com.github.triceo.splitlog.api.Follower;
14  import com.github.triceo.splitlog.api.LogWatchBuilder;
15  import com.github.triceo.splitlog.api.Message;
16  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
17  import com.github.triceo.splitlog.api.TailSplitter;
18  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
19  import com.github.triceo.splitlog.util.SplitlogTailer;
20  import com.github.triceo.splitlog.util.SplitlogThreadFactory;
21  
22  /**
23   * Has a sole responsibility of starting and stopping {@link Tailer} thread when
24   * told so by the {@link DefaultLogWatch}.
25   */
26  final class LogWatchTailingManager {
27  
28      private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new SplitlogThreadFactory("tails"));
29  
30      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(LogWatchTailingManager.class);
31      private final int bufferSize;
32      private MessageBuilder currentlyProcessedMessage;
33      private final long delayBetweenReads;
34      private final AtomicBoolean isReading = new AtomicBoolean(false);
35      private final AtomicBoolean isTailing = new AtomicBoolean(false);
36      private final AtomicLong numberOfTimesThatTailerWasStarted = new AtomicLong(0);
37      private WeakReference<Message> previousAcceptedMessage;
38      private final boolean reopenBetweenReads, ignoreExistingContent;
39      private final TailSplitter splitter;
40      private SplitlogTailer tailer;
41  
42      private final DefaultLogWatch watch;
43  
44      public LogWatchTailingManager(final DefaultLogWatch watch, final LogWatchBuilder builder,
45          final TailSplitter splitter) {
46          this.watch = watch;
47          this.splitter = splitter;
48          this.delayBetweenReads = builder.getDelayBetweenReads();
49          this.bufferSize = builder.getReadingBufferSize();
50          this.reopenBetweenReads = builder.isClosingBetweenReads();
51          this.ignoreExistingContent = !builder.isReadingFromBeginning();
52      }
53  
54      @Deprecated
55      public Message getCurrentlyProcessedMessage() {
56          if (this.currentlyProcessedMessage == null) {
57              return null;
58          } else {
59              return this.currentlyProcessedMessage.buildIntermediate(this.splitter);
60          }
61      }
62  
63      public DefaultLogWatch getWatch() {
64          return this.watch;
65      }
66  
67      protected void readingFinished() {
68          if (!this.isReading.compareAndSet(true, false)) {
69              return;
70          }
71          LogWatchTailingManager.LOGGER.info("Tailing stopped submitting lines.");
72          if (this.currentlyProcessedMessage != null) {
73              /*
74               * there will be no more lines from the current reading burst; the
75               * currently processed message must be marked as INCOMING with the
76               * possibility of being finished in the subsequent reading burst(s).
77               */
78              this.getWatch().messageIncoming(this.currentlyProcessedMessage.buildIntermediate(this.splitter));
79          }
80      }
81  
82      protected void readingStarted() {
83          if (!this.isReading.compareAndSet(false, true)) {
84              return;
85          }
86          LogWatchTailingManager.LOGGER.info("Tailing will now start submitting lines.");
87      }
88  
89      protected void readLine(final String line) {
90          if (!this.isReading.get()) {
91              LogWatchTailingManager.LOGGER.warn("Line '{}' received when the tailer shouldn't have been sending: {}.",
92                      line, this);
93              return;
94          }
95          final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
96          if (this.splitter.isStartingLine(line)) {
97              // new message begins
98              if (isMessageBeingProcessed) { // finish old message
99                  LogWatchTailingManager.LOGGER.debug("Existing message will be finished.");
100                 final Message completeMessage = this.currentlyProcessedMessage.buildFinal(this.splitter);
101                 final MessageDeliveryStatus accepted = this.getWatch().messageArrived(completeMessage);
102                 this.currentlyProcessedMessage = null;
103                 if (accepted == null) {
104                     LogWatchTailingManager.LOGGER.info("Message {} rejected at the gate to {}.", completeMessage, this);
105                 } else if (accepted == MessageDeliveryStatus.ACCEPTED) {
106                     this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
107                 } else {
108                     LogWatchTailingManager.LOGGER
109                     .info("Message {} rejected from storage in {}.", completeMessage, this);
110                 }
111             }
112             // prepare for new message
113             LogWatchTailingManager.LOGGER.debug("New message is being prepared.");
114             this.currentlyProcessedMessage = new MessageBuilder(line);
115             if (this.previousAcceptedMessage != null) {
116                 this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
117             }
118         } else {
119             // continue present message
120             if (!isMessageBeingProcessed) {
121                 LogWatchTailingManager.LOGGER.debug("Disregarding line as trash.");
122                 // most likely just a garbage immediately after start
123                 return;
124             }
125             LogWatchTailingManager.LOGGER.debug("Existing message is being updated.");
126             this.currentlyProcessedMessage.add(line);
127         }
128         LogWatchTailingManager.LOGGER.debug("Line processing over.");
129     }
130 
131     /**
132      * Start the tailer on a separate thread. Only when a tailer is running can
133      * {@link Follower}s be notified of new {@link Message}s from the log.
134      *
135      * @return True if the start was scheduled, false if scheduled already.
136      */
137     public boolean start() {
138         if (!this.isTailing.compareAndSet(false, true)) {
139             return false;
140         }
141         final boolean willReadFromEnd = this.willReadFromEnd();
142         LogWatchTailingManager.LOGGER.debug("Tailer {} ignore existing file contents.", willReadFromEnd ? "will"
143                 : "won't");
144         this.tailer = new SplitlogTailer(this.watch.getWatchedFile(), new LogWatchTailerListener(this),
145                 this.delayBetweenReads, this.willReadFromEnd(), this.reopenBetweenReads, this.bufferSize);
146         LogWatchTailingManager.EXECUTOR.submit(this.tailer);
147         final long start = System.nanoTime();
148         this.tailer.waitUntilStarted();
149         final long duration = System.nanoTime() - start;
150         LogWatchTailingManager.LOGGER.debug("It took {} ms for the tailing to actually start.",
151                 TimeUnit.NANOSECONDS.toMillis(duration));
152         final long iterationNum = this.numberOfTimesThatTailerWasStarted.incrementAndGet();
153         LogWatchTailingManager.LOGGER.info("Tailing #{} started for {}.", iterationNum, this.watch);
154         return true;
155     }
156 
157     /**
158      * Stop the tailer thread, preventing any {@link Follower}s from receiving
159      * {@link Message}s.
160      *
161      * @return True if stopped, false if never running.
162      */
163     public boolean stop() {
164         if (!this.isTailing.get()) {
165             LogWatchTailingManager.LOGGER.debug("Tailer not running, therefore not terminating.");
166             return false;
167         }
168         /*
169          * terminate tailer; we stop the scheduler and the task will therefore
170          * never be started again
171          */
172         this.tailer.stop();
173         LogWatchTailingManager.LOGGER.info("Terminated tailing #{} for {}.",
174                 this.numberOfTimesThatTailerWasStarted.get(), this.watch);
175         return true;
176     }
177 
178     protected void tailingFinished() {
179         this.isReading.set(false);
180         LogWatchTailingManager.LOGGER.info("Tailing terminated.");
181         if (this.currentlyProcessedMessage != null) {
182             /*
183              * there will be no more lines. the last message must be accepted or
184              * rejected as well.
185              */
186             this.getWatch().messageArrived(this.currentlyProcessedMessage.buildFinal(this.splitter));
187             this.currentlyProcessedMessage = null;
188             this.previousAcceptedMessage = null;
189         }
190     }
191 
192     private boolean willReadFromEnd() {
193         if (this.numberOfTimesThatTailerWasStarted.get() > 0) {
194             return true;
195         } else {
196             return this.ignoreExistingContent;
197         }
198     }
199 
200 }