View Javadoc
1   package com.github.triceo.splitlog;
2   
3   import it.unimi.dsi.fastutil.objects.ObjectRBTreeSet;
4   import it.unimi.dsi.fastutil.objects.ObjectSortedSet;
5   
6   import java.io.BufferedWriter;
7   import java.io.IOException;
8   import java.io.OutputStream;
9   import java.io.OutputStreamWriter;
10  import java.util.Collections;
11  import java.util.HashSet;
12  import java.util.List;
13  import java.util.Set;
14  import java.util.SortedSet;
15  import java.util.TreeSet;
16  import java.util.concurrent.atomic.AtomicBoolean;
17  
18  import org.apache.commons.io.IOUtils;
19  import org.apache.commons.lang3.tuple.Pair;
20  import org.slf4j.Logger;
21  
22  import com.github.triceo.splitlog.api.Follower;
23  import com.github.triceo.splitlog.api.LogWatch;
24  import com.github.triceo.splitlog.api.MergingFollower;
25  import com.github.triceo.splitlog.api.Message;
26  import com.github.triceo.splitlog.api.MessageComparator;
27  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
28  import com.github.triceo.splitlog.api.MessageFormatter;
29  import com.github.triceo.splitlog.api.MessageMeasure;
30  import com.github.triceo.splitlog.api.SimpleMessageCondition;
31  import com.github.triceo.splitlog.formatters.NoopMessageFormatter;
32  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
33  import com.github.triceo.splitlog.util.LogUtil;
34  import com.github.triceo.splitlog.util.LogUtil.Level;
35  
36  /**
37   * This is a log follower that holds no message data, just the tags. For message
38   * data, it will always turn to the underlying {@link LogWatch}.
39   *
40   * This class assumes that LogWatch and user code are the only two threads that
41   * use it. Never use one instance of this class from two or more user threads.
42   * Otherwise, unpredictable behavior from waitFor() methods is possible.
43   *
44   * Will use {@link NoopMessageFormatter} as default message formatter.
45   */
46  final class DefaultFollower extends AbstractCommonFollower<Follower, LogWatch> implements Follower {
47  
48      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultFollower.class);
49  
50      private final ConsumerManager<Follower> consumers = new ConsumerManager<Follower>(this);
51      private final AtomicBoolean isStopped = new AtomicBoolean(false);
52      private final SortedSet<Message> tags = new TreeSet<Message>();
53  
54      private final DefaultLogWatch watch;
55  
56      public DefaultFollower(final DefaultLogWatch watch,
57          final List<Pair<String, MessageMeasure<? extends Number, Follower>>> measuresHandedDown) {
58          for (final Pair<String, MessageMeasure<? extends Number, Follower>> pair : measuresHandedDown) {
59              this.startMeasuring(pair.getValue(), pair.getKey());
60          }
61          this.watch = watch;
62      }
63  
64      @Override
65      protected ConsumerManager<Follower> getConsumerManager() {
66          return this.consumers;
67      }
68  
69      @Override
70      protected MessageFormatter getDefaultFormatter() {
71          return NoopMessageFormatter.INSTANCE;
72      }
73  
74      @Override
75      public LogWatch getFollowed() {
76          return this.getWatch();
77      }
78  
79      @Override
80      public SortedSet<Message> getMessages(final SimpleMessageCondition condition,
81              final MessageComparator order) {
82          final ObjectSortedSet<Message> messages = new ObjectRBTreeSet<Message>(order);
83          for (final Message msg : this.getWatch().getAllMessages(this)) {
84              if (!condition.accept(msg)) {
85                  continue;
86              }
87              messages.add(msg);
88          }
89          messages.addAll(this.tags);
90          return Collections.unmodifiableSortedSet(messages);
91      }
92  
93      protected DefaultLogWatch getWatch() {
94          return this.watch;
95      }
96  
97      @Override
98      public synchronized boolean isStopped() {
99          if (this.isStopped.get()) {
100             return true;
101         } else if (this.getFollowed().isFollowedBy(this)) {
102             return false;
103         } else {
104             this.isStopped.set(true);
105             return true;
106         }
107     }
108 
109     @Override
110     public MergingFollower mergeWith(final Follower f) {
111         if (f == null) {
112             throw new IllegalArgumentException("Cannot merge with null.");
113         } else if (f == this) {
114             throw new IllegalArgumentException("Cannot merge with self.");
115         }
116         return new DefaultMergingFollower(this, f);
117     }
118 
119     @Override
120     public MergingFollower mergeWith(final MergingFollower f) {
121         if (f == null) {
122             throw new IllegalArgumentException("Cannot merge with null.");
123         }
124         final Set<Follower> followers = new HashSet<Follower>(f.getMerged());
125         followers.add(this);
126         return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
127     }
128 
129     @Override
130     public void messageReceived(final Message msg, final MessageDeliveryStatus status,
131         final LogWatch source) {
132         if (this.isStopped()) {
133             throw new IllegalStateException("Follower already stopped.");
134         } else if (source != this.getWatch()) {
135             throw new IllegalArgumentException("Forbidden notification source: " + source);
136         }
137         LogUtil.newMessage(DefaultFollower.LOGGER, Level.INFO, "New message received:", msg, status, source, this);
138         this.getExpectationManager().messageReceived(msg, status, source);
139         this.getConsumerManager().messageReceived(msg, status, this);
140     }
141 
142     @Override
143     public boolean stop() {
144         if (this.isStopped()) {
145             return false;
146         }
147         DefaultFollower.LOGGER.info("Stopping {}.", this);
148         this.getFollowed().stopFollowing(this);
149         this.getConsumerManager().stop();
150         this.getExpectationManager().stop();
151         DefaultFollower.LOGGER.info("Stopped {}.", this);
152         return true;
153     }
154 
155     @Override
156     public Message tag(final String tagLine) {
157         final Message message = new MessageBuilder(tagLine).buildTag();
158         this.tags.add(message);
159         return message;
160     }
161 
162     @Override
163     public String toString() {
164         final StringBuilder builder = new StringBuilder();
165         builder.append("DefaultFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
166         if (this.getFollowed() != null) {
167             builder.append("getFollowed()=").append(this.getFollowed()).append(", ");
168         }
169         builder.append("isStopped()=").append(this.isStopped()).append("]");
170         return builder.toString();
171     }
172 
173     @Override
174     public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
175         final MessageComparator order, final MessageFormatter formatter) {
176         if (stream == null) {
177             throw new IllegalArgumentException("Stream may not be null.");
178         } else if (condition == null) {
179             throw new IllegalArgumentException("Condition may not be null.");
180         } else if (order == null) {
181             throw new IllegalArgumentException("Comparator may not be null.");
182         }
183         BufferedWriter w = null;
184         try {
185             w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
186             for (final Message msg : this.getMessages(condition, order)) {
187                 w.write(formatter.format(msg, this.getFollowed().getWatchedFile()));
188                 w.newLine();
189             }
190             return true;
191         } catch (final IOException ex) {
192             return false;
193         } finally {
194             IOUtils.closeQuietly(w);
195         }
196     }
197 
198 }