1 package com.github.triceo.splitlog;
2
3 import java.lang.ref.WeakReference;
4 import java.util.concurrent.ExecutorService;
5 import java.util.concurrent.Executors;
6 import java.util.concurrent.TimeUnit;
7 import java.util.concurrent.atomic.AtomicBoolean;
8 import java.util.concurrent.atomic.AtomicLong;
9
10 import org.apache.commons.io.input.Tailer;
11 import org.slf4j.Logger;
12
13 import com.github.triceo.splitlog.api.Follower;
14 import com.github.triceo.splitlog.api.LogWatchBuilder;
15 import com.github.triceo.splitlog.api.Message;
16 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
17 import com.github.triceo.splitlog.api.TailSplitter;
18 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
19 import com.github.triceo.splitlog.util.SplitlogTailer;
20 import com.github.triceo.splitlog.util.SplitlogThreadFactory;
21
22
23
24
25
26 final class LogWatchTailingManager {
27
28 private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new SplitlogThreadFactory("tails"));
29
30 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(LogWatchTailingManager.class);
31 private final int bufferSize;
32 private MessageBuilder currentlyProcessedMessage;
33 private final long delayBetweenReads;
34 private final AtomicBoolean isReading = new AtomicBoolean(false);
35 private final AtomicBoolean isTailing = new AtomicBoolean(false);
36 private final AtomicLong numberOfTimesThatTailerWasStarted = new AtomicLong(0);
37 private WeakReference<Message> previousAcceptedMessage;
38 private final boolean reopenBetweenReads, ignoreExistingContent;
39 private final TailSplitter splitter;
40 private SplitlogTailer tailer;
41
42 private final DefaultLogWatch watch;
43
44 public LogWatchTailingManager(final DefaultLogWatch watch, final LogWatchBuilder builder,
45 final TailSplitter splitter) {
46 this.watch = watch;
47 this.splitter = splitter;
48 this.delayBetweenReads = builder.getDelayBetweenReads();
49 this.bufferSize = builder.getReadingBufferSize();
50 this.reopenBetweenReads = builder.isClosingBetweenReads();
51 this.ignoreExistingContent = !builder.isReadingFromBeginning();
52 }
53
54 @Deprecated
55 public Message getCurrentlyProcessedMessage() {
56 if (this.currentlyProcessedMessage == null) {
57 return null;
58 } else {
59 return this.currentlyProcessedMessage.buildIntermediate(this.splitter);
60 }
61 }
62
63 public DefaultLogWatch getWatch() {
64 return this.watch;
65 }
66
67 protected void readingFinished() {
68 if (!this.isReading.compareAndSet(true, false)) {
69 return;
70 }
71 LogWatchTailingManager.LOGGER.info("Tailing stopped submitting lines.");
72 if (this.currentlyProcessedMessage != null) {
73
74
75
76
77
78 this.getWatch().messageIncoming(this.currentlyProcessedMessage.buildIntermediate(this.splitter));
79 }
80 }
81
82 protected void readingStarted() {
83 if (!this.isReading.compareAndSet(false, true)) {
84 return;
85 }
86 LogWatchTailingManager.LOGGER.info("Tailing will now start submitting lines.");
87 }
88
89 protected void readLine(final String line) {
90 if (!this.isReading.get()) {
91 LogWatchTailingManager.LOGGER.warn("Line '{}' received when the tailer shouldn't have been sending: {}.",
92 line, this);
93 return;
94 }
95 final boolean isMessageBeingProcessed = this.currentlyProcessedMessage != null;
96 if (this.splitter.isStartingLine(line)) {
97
98 if (isMessageBeingProcessed) {
99 LogWatchTailingManager.LOGGER.debug("Existing message will be finished.");
100 final Message completeMessage = this.currentlyProcessedMessage.buildFinal(this.splitter);
101 final MessageDeliveryStatus accepted = this.getWatch().messageArrived(completeMessage);
102 this.currentlyProcessedMessage = null;
103 if (accepted == null) {
104 LogWatchTailingManager.LOGGER.info("Message {} rejected at the gate to {}.", completeMessage, this);
105 } else if (accepted == MessageDeliveryStatus.ACCEPTED) {
106 this.previousAcceptedMessage = new WeakReference<Message>(completeMessage);
107 } else {
108 LogWatchTailingManager.LOGGER
109 .info("Message {} rejected from storage in {}.", completeMessage, this);
110 }
111 }
112
113 LogWatchTailingManager.LOGGER.debug("New message is being prepared.");
114 this.currentlyProcessedMessage = new MessageBuilder(line);
115 if (this.previousAcceptedMessage != null) {
116 this.currentlyProcessedMessage.setPreviousMessage(this.previousAcceptedMessage.get());
117 }
118 } else {
119
120 if (!isMessageBeingProcessed) {
121 LogWatchTailingManager.LOGGER.debug("Disregarding line as trash.");
122
123 return;
124 }
125 LogWatchTailingManager.LOGGER.debug("Existing message is being updated.");
126 this.currentlyProcessedMessage.add(line);
127 }
128 LogWatchTailingManager.LOGGER.debug("Line processing over.");
129 }
130
131
132
133
134
135
136
137 public boolean start() {
138 if (!this.isTailing.compareAndSet(false, true)) {
139 return false;
140 }
141 final boolean willReadFromEnd = this.willReadFromEnd();
142 LogWatchTailingManager.LOGGER.debug("Tailer {} ignore existing file contents.", willReadFromEnd ? "will"
143 : "won't");
144 this.tailer = new SplitlogTailer(this.watch.getWatchedFile(), new LogWatchTailerListener(this),
145 this.delayBetweenReads, this.willReadFromEnd(), this.reopenBetweenReads, this.bufferSize);
146 LogWatchTailingManager.EXECUTOR.submit(this.tailer);
147 final long start = System.nanoTime();
148 this.tailer.waitUntilStarted();
149 final long duration = System.nanoTime() - start;
150 LogWatchTailingManager.LOGGER.debug("It took {} ms for the tailing to actually start.",
151 TimeUnit.NANOSECONDS.toMillis(duration));
152 final long iterationNum = this.numberOfTimesThatTailerWasStarted.incrementAndGet();
153 LogWatchTailingManager.LOGGER.info("Tailing #{} started for {}.", iterationNum, this.watch);
154 return true;
155 }
156
157
158
159
160
161
162
163 public boolean stop() {
164 if (!this.isTailing.get()) {
165 LogWatchTailingManager.LOGGER.debug("Tailer not running, therefore not terminating.");
166 return false;
167 }
168
169
170
171
172 this.tailer.stop();
173 LogWatchTailingManager.LOGGER.info("Terminated tailing #{} for {}.",
174 this.numberOfTimesThatTailerWasStarted.get(), this.watch);
175 return true;
176 }
177
178 protected void tailingFinished() {
179 this.isReading.set(false);
180 LogWatchTailingManager.LOGGER.info("Tailing terminated.");
181 if (this.currentlyProcessedMessage != null) {
182
183
184
185
186 this.getWatch().messageArrived(this.currentlyProcessedMessage.buildFinal(this.splitter));
187 this.currentlyProcessedMessage = null;
188 this.previousAcceptedMessage = null;
189 }
190 }
191
192 private boolean willReadFromEnd() {
193 if (this.numberOfTimesThatTailerWasStarted.get() > 0) {
194 return true;
195 } else {
196 return this.ignoreExistingContent;
197 }
198 }
199
200 }