View Javadoc

1   package com.github.triceo.splitlog;
2   
3   import java.io.BufferedWriter;
4   import java.io.IOException;
5   import java.io.OutputStream;
6   import java.io.OutputStreamWriter;
7   import java.util.Collections;
8   import java.util.HashSet;
9   import java.util.LinkedHashSet;
10  import java.util.List;
11  import java.util.Set;
12  import java.util.SortedSet;
13  import java.util.TreeSet;
14  
15  import org.apache.commons.io.IOUtils;
16  import org.apache.commons.lang3.tuple.Pair;
17  import org.slf4j.Logger;
18  
19  import com.github.triceo.splitlog.api.Follower;
20  import com.github.triceo.splitlog.api.LogWatch;
21  import com.github.triceo.splitlog.api.MergingFollower;
22  import com.github.triceo.splitlog.api.Message;
23  import com.github.triceo.splitlog.api.MessageComparator;
24  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
25  import com.github.triceo.splitlog.api.MessageFormatter;
26  import com.github.triceo.splitlog.api.MessageMeasure;
27  import com.github.triceo.splitlog.api.SimpleMessageCondition;
28  import com.github.triceo.splitlog.formatters.NoopMessageFormatter;
29  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
30  
31  /**
32   * This is a log follower that holds no message data, just the tags. For message
33   * data, it will always turn to the underlying {@link LogWatch}.
34   *
35   * This class assumes that LogWatch and user code are the only two threads that
36   * use it. Never use one instance of this class from two or more user threads.
37   * Otherwise, unpredictable behavior from waitFor() methods is possible.
38   *
39   * Metrics within will never be terminated (and thus removed) unless done by the
40   * user. Not even when no longer {@link #isFollowing()}.
41   *
42   * FIXME maybe we should do something about that ^^^^
43   *
44   * Will use {@link NoopMessageFormatter} as default message formatter.
45   */
46  final class DefaultFollower extends AbstractCommonFollower<Follower, LogWatch> implements Follower {
47  
48      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultFollower.class);
49  
50      private final ConsumerManager<Follower> consumers = new ConsumerManager<Follower>(this);
51      private final Set<Message> tags = new LinkedHashSet<Message>();
52      private final DefaultLogWatch watch;
53  
54      public DefaultFollower(final DefaultLogWatch watch,
55              final List<Pair<String, MessageMeasure<? extends Number, Follower>>> measuresHandedDown) {
56          for (final Pair<String, MessageMeasure<? extends Number, Follower>> pair : measuresHandedDown) {
57              this.startMeasuring(pair.getValue(), pair.getKey(), false);
58          }
59          this.watch = watch;
60      }
61  
62      @Override
63      protected ConsumerManager<Follower> getConsumerManager() {
64          return this.consumers;
65      }
66  
67      @Override
68      protected MessageFormatter getDefaultFormatter() {
69          return NoopMessageFormatter.INSTANCE;
70      }
71  
72      @Override
73      public LogWatch getFollowed() {
74          return this.getWatch();
75      }
76  
77      // FIXME should be synchronized; but then the tests hang weirdly
78      @Override
79      public SortedSet<Message> getMessages(final SimpleMessageCondition condition, final MessageComparator order) {
80          final SortedSet<Message> messages = new TreeSet<Message>(order);
81          for (final Message msg : this.getWatch().getAllMessages(this)) {
82              if (!condition.accept(msg)) {
83                  continue;
84              }
85              messages.add(msg);
86          }
87          messages.addAll(this.getTags());
88          return Collections.unmodifiableSortedSet(messages);
89      }
90  
91      protected Set<Message> getTags() {
92          return this.tags;
93      }
94  
95      protected DefaultLogWatch getWatch() {
96          return this.watch;
97      }
98  
99      @Override
100     public boolean isStopped() {
101         return !this.getFollowed().isFollowedBy(this);
102     }
103 
104     @Override
105     public MergingFollower mergeWith(final Follower f) {
106         if (f == null) {
107             throw new IllegalArgumentException("Cannot merge with null.");
108         } else if (f == this) {
109             throw new IllegalArgumentException("Cannot merge with self.");
110         }
111         return new DefaultMergingFollower(this, f);
112     }
113 
114     @Override
115     public MergingFollower mergeWith(final MergingFollower f) {
116         if (f == null) {
117             throw new IllegalArgumentException("Cannot merge with null.");
118         }
119         final Set<Follower> followers = new HashSet<Follower>(f.getMerged());
120         followers.add(this);
121         return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
122     }
123 
124     @Override
125     public synchronized void messageReceived(final Message msg, final MessageDeliveryStatus status,
126         final LogWatch source) {
127         if (this.isStopped()) {
128             throw new IllegalStateException("Follower already stopped.");
129         } else if (source != this.getWatch()) {
130             throw new IllegalArgumentException("Forbidden notification source: " + source);
131         }
132         DefaultFollower.LOGGER.info("{} notified of '{}' with status {}.", this, msg, status);
133         this.getExchange().messageReceived(msg, status, source);
134         this.getConsumerManager().messageReceived(msg, status, this);
135     }
136 
137     @Override
138     public synchronized boolean stop() {
139         if (this.isStopped()) {
140             return false;
141         }
142         DefaultFollower.LOGGER.info("Stopping {}.", this);
143         this.getFollowed().stopFollowing(this);
144         this.getConsumerManager().stop();
145         this.getExchange().stop();
146         DefaultFollower.LOGGER.info("Stopped {}.", this);
147         return true;
148     }
149 
150     @Override
151     public Message tag(final String tagLine) {
152         final Message message = new MessageBuilder(tagLine).buildTag();
153         this.tags.add(message);
154         return message;
155     }
156 
157     @Override
158     public String toString() {
159         final StringBuilder builder = new StringBuilder();
160         builder.append("DefaultFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
161         if (this.getFollowed() != null) {
162             builder.append("getFollowed()=").append(this.getFollowed()).append(", ");
163         }
164         builder.append("isStopped()=").append(this.isStopped()).append("]");
165         return builder.toString();
166     }
167 
168     @Override
169     public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
170         final MessageComparator order, final MessageFormatter formatter) {
171         if (stream == null) {
172             throw new IllegalArgumentException("Stream may not be null.");
173         } else if (condition == null) {
174             throw new IllegalArgumentException("Condition may not be null.");
175         } else if (order == null) {
176             throw new IllegalArgumentException("Comparator may not be null.");
177         }
178         BufferedWriter w = null;
179         try {
180             w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
181             for (final Message msg : this.getMessages(condition, order)) {
182                 w.write(formatter.format(msg, this.getFollowed().getWatchedFile()));
183                 w.newLine();
184             }
185             return true;
186         } catch (final IOException ex) {
187             return false;
188         } finally {
189             IOUtils.closeQuietly(w);
190         }
191     }
192 
193 }