View Javadoc

1   package com.github.triceo.splitlog;
2   
3   import java.io.BufferedWriter;
4   import java.io.IOException;
5   import java.io.OutputStream;
6   import java.io.OutputStreamWriter;
7   import java.util.ArrayList;
8   import java.util.Collection;
9   import java.util.Collections;
10  import java.util.HashMap;
11  import java.util.HashSet;
12  import java.util.LinkedHashSet;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Set;
16  import java.util.SortedSet;
17  import java.util.TreeSet;
18  
19  import org.apache.commons.io.IOUtils;
20  import org.slf4j.Logger;
21  
22  import com.github.triceo.splitlog.api.Follower;
23  import com.github.triceo.splitlog.api.LogWatch;
24  import com.github.triceo.splitlog.api.MergingFollower;
25  import com.github.triceo.splitlog.api.Message;
26  import com.github.triceo.splitlog.api.MessageComparator;
27  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
28  import com.github.triceo.splitlog.api.MessageFormatter;
29  import com.github.triceo.splitlog.api.SimpleMessageCondition;
30  import com.github.triceo.splitlog.formatters.UnifyingMessageFormatter;
31  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
32  
33  /**
34   * Will use {@link UnifyingMessageFormatter} as default message formatter.
35   *
36   */
37  final class DefaultMergingFollower extends AbstractCommonFollower<MergingFollower, Follower> implements MergingFollower {
38  
39      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultMergingFollower.class);
40  
41      private final ConsumerManager<MergingFollower> consumers = new ConsumerManager<MergingFollower>(this);
42      private final Set<Follower> followers = new LinkedHashSet<Follower>();
43  
44      protected DefaultMergingFollower(final Follower... followers) {
45          DefaultMergingFollower.LOGGER.info("Merging followers into {}.", this);
46          final Set<LogWatch> watches = new HashSet<LogWatch>();
47          for (final Follower f : followers) {
48              final DefaultFollower af = (DefaultFollower) f;
49              this.followers.add(af);
50              af.registerConsumer(this);
51              watches.add(af.getFollowed());
52          }
53          if (watches.size() < this.followers.size()) {
54              DefaultMergingFollower.LOGGER.warn("Followers from the same LogWatch, possible message duplication.");
55          }
56          DefaultMergingFollower.LOGGER.info("Followers merged: {}.", this);
57      }
58  
59      @Override
60      protected ConsumerManager<MergingFollower> getConsumerManager() {
61          return this.consumers;
62      }
63  
64      @Override
65      protected MessageFormatter getDefaultFormatter() {
66          return UnifyingMessageFormatter.INSTANCE;
67      }
68  
69      @Override
70      public Collection<Follower> getMerged() {
71          return Collections.unmodifiableSet(this.followers);
72      }
73  
74      @Override
75      public synchronized SortedSet<Message> getMessages(final SimpleMessageCondition condition,
76              final MessageComparator order) {
77          final SortedSet<Message> sorted = new TreeSet<Message>(order);
78          for (final Follower f : this.getMerged()) {
79              for (final Message m : f.getMessages()) {
80                  if (!condition.accept(m)) {
81                      continue;
82                  }
83                  sorted.add(m);
84              }
85          }
86          return Collections.unmodifiableSortedSet(sorted);
87      }
88  
89      @Override
90      public synchronized boolean isStopped() {
91          for (final Follower f : this.followers) {
92              if (!f.isStopped()) {
93                  return false;
94              }
95          }
96          return true;
97      }
98  
99      @Override
100     public MergingFollower mergeWith(final Follower f) {
101         if (f == null) {
102             throw new IllegalArgumentException("Cannot merge with null.");
103         }
104         final Set<Follower> followers = new HashSet<Follower>(this.followers);
105         followers.add(f);
106         return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
107     }
108 
109     @Override
110     public MergingFollower mergeWith(final MergingFollower f) {
111         if (f == null) {
112             throw new IllegalArgumentException("Cannot merge with null.");
113         } else if (f == this) {
114             throw new IllegalArgumentException("Cannot merge with self.");
115         }
116         final Set<Follower> followers = new HashSet<Follower>(this.followers);
117         followers.addAll(f.getMerged());
118         return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
119     }
120 
121     @Override
122     public synchronized void messageReceived(final Message msg, final MessageDeliveryStatus status,
123         final Follower source) {
124         if (this.isStopped()) {
125             throw new IllegalStateException("Follower already stopped.");
126         } else if (!this.getMerged().contains(source)) {
127             throw new IllegalArgumentException("Forbidden notification source: " + source);
128         }
129         DefaultMergingFollower.LOGGER.info("{} notified of '{}' with status {} by {}.", this, msg, status, source);
130         this.getExpectationManager().messageReceived(msg, status, source);
131         this.getConsumerManager().messageReceived(msg, status, this);
132     }
133 
134     @Override
135     public MergingFollower remove(final Follower f) {
136         if (!this.getMerged().contains(f)) {
137             return this;
138         } else if (this.getMerged().size() == 1) {
139             return null;
140         } else {
141             DefaultMergingFollower.LOGGER.info("Separating {} from {}.", f, this);
142             final List<Follower> followers = new ArrayList<Follower>(this.followers);
143             followers.remove(f);
144             return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
145         }
146     }
147 
148     @Override
149     public boolean separate(final Follower f) {
150         if (!this.followers.remove(f)) {
151             return false;
152         }
153         // we know about this follower, so the cast is safe
154         DefaultMergingFollower.LOGGER.info("Separating {} from {}.", f, this);
155         return f.stopConsuming(this);
156     }
157 
158     @Override
159     public synchronized boolean stop() {
160         if (this.isStopped()) {
161             return false;
162         }
163         DefaultMergingFollower.LOGGER.info("Stopping {}.", this);
164         for (final Follower f : this.getMerged()) {
165             f.stop();
166         }
167         this.getConsumerManager().stop();
168         this.getExpectationManager().stop();
169         DefaultMergingFollower.LOGGER.info("Stopped {}.", this);
170         return true;
171     }
172 
173     @Override
174     public String toString() {
175         final StringBuilder builder = new StringBuilder();
176         builder.append("DefaultMergingFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
177         if (this.getMerged() != null) {
178             builder.append("getMerged()=").append(this.getMerged()).append(", ");
179         }
180         builder.append("isStopped()=").append(this.isStopped()).append("]");
181         return builder.toString();
182     }
183 
184     @Override
185     public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
186         final MessageComparator order, final MessageFormatter formatter) {
187         if (stream == null) {
188             throw new IllegalArgumentException("Stream may not be null.");
189         } else if (condition == null) {
190             throw new IllegalArgumentException("Condition may not be null.");
191         } else if (order == null) {
192             throw new IllegalArgumentException("Comparator may not be null.");
193         }
194         /*
195          * assemble messages per-follower, so that we can properly retrieve
196          * their source
197          */
198         final SortedSet<Message> messages = new TreeSet<Message>(order);
199         final Map<Message, String> messagesToText = new HashMap<Message, String>();
200         for (final Follower f : this.getMerged()) {
201             for (final Message m : f.getMessages(condition)) {
202                 messages.add(m);
203                 messagesToText.put(m, formatter.format(m, f.getFollowed().getWatchedFile()));
204             }
205         }
206         // and now write them in their original order
207         BufferedWriter w = null;
208         try {
209             w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
210             for (final Message msg : messages) {
211                 w.write(messagesToText.get(msg));
212                 w.newLine();
213             }
214             return true;
215         } catch (final IOException ex) {
216             return false;
217         } finally {
218             IOUtils.closeQuietly(w);
219         }
220     }
221 
222 }