View Javadoc
1   package com.github.triceo.splitlog;
2   
3   import java.io.BufferedWriter;
4   import java.io.IOException;
5   import java.io.OutputStream;
6   import java.io.OutputStreamWriter;
7   import java.util.ArrayList;
8   import java.util.Collection;
9   import java.util.Collections;
10  import java.util.HashMap;
11  import java.util.HashSet;
12  import java.util.LinkedHashSet;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Set;
16  import java.util.SortedSet;
17  import java.util.TreeSet;
18  
19  import org.apache.commons.io.IOUtils;
20  import org.slf4j.Logger;
21  
22  import com.github.triceo.splitlog.api.Follower;
23  import com.github.triceo.splitlog.api.LogWatch;
24  import com.github.triceo.splitlog.api.MergingFollower;
25  import com.github.triceo.splitlog.api.Message;
26  import com.github.triceo.splitlog.api.MessageComparator;
27  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
28  import com.github.triceo.splitlog.api.MessageFormatter;
29  import com.github.triceo.splitlog.api.SimpleMessageCondition;
30  import com.github.triceo.splitlog.formatters.UnifyingMessageFormatter;
31  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
32  import com.github.triceo.splitlog.util.LogUtil;
33  import com.github.triceo.splitlog.util.LogUtil.Level;
34  
35  /**
36   * Will use {@link UnifyingMessageFormatter} as default message formatter.
37   *
38   */
39  final class DefaultMergingFollower extends AbstractCommonFollower<MergingFollower, Follower> implements MergingFollower {
40  
41      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultMergingFollower.class);
42  
43      private final ConsumerManager<MergingFollower> consumers = new ConsumerManager<MergingFollower>(this);
44      private final Set<Follower> followers = new LinkedHashSet<Follower>();
45  
46      protected DefaultMergingFollower(final Follower... followers) {
47          DefaultMergingFollower.LOGGER.info("Merging followers into {}.", this);
48          final Set<LogWatch> watches = new HashSet<LogWatch>();
49          for (final Follower f : followers) {
50              final DefaultFollower af = (DefaultFollower) f;
51              this.followers.add(af);
52              af.registerConsumer(this);
53              watches.add(af.getFollowed());
54          }
55          if (watches.size() < this.followers.size()) {
56              DefaultMergingFollower.LOGGER.warn("Followers from the same LogWatch, possible message duplication.");
57          }
58          DefaultMergingFollower.LOGGER.info("Followers merged: {}.", this);
59      }
60  
61      @Override
62      protected ConsumerManager<MergingFollower> getConsumerManager() {
63          return this.consumers;
64      }
65  
66      @Override
67      protected MessageFormatter getDefaultFormatter() {
68          return UnifyingMessageFormatter.INSTANCE;
69      }
70  
71      @Override
72      public Collection<Follower> getMerged() {
73          return Collections.unmodifiableSet(this.followers);
74      }
75  
76      @Override
77      public SortedSet<Message> getMessages(final SimpleMessageCondition condition, final MessageComparator order) {
78          final SortedSet<Message> sorted = new TreeSet<Message>(order);
79          for (final Follower f : this.getMerged()) {
80              for (final Message m : f.getMessages()) {
81                  if (!condition.accept(m)) {
82                      continue;
83                  }
84                  sorted.add(m);
85              }
86          }
87          return Collections.unmodifiableSortedSet(sorted);
88      }
89  
90      @Override
91      public synchronized boolean isStopped() {
92          for (final Follower f : this.followers) {
93              if (!f.isStopped()) {
94                  return false;
95              }
96          }
97          return true;
98      }
99  
100     @Override
101     public MergingFollower mergeWith(final Follower f) {
102         if (f == null) {
103             throw new IllegalArgumentException("Cannot merge with null.");
104         }
105         final Set<Follower> followers = new HashSet<Follower>(this.followers);
106         followers.add(f);
107         return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
108     }
109 
110     @Override
111     public MergingFollower mergeWith(final MergingFollower f) {
112         if (f == null) {
113             throw new IllegalArgumentException("Cannot merge with null.");
114         } else if (f == this) {
115             throw new IllegalArgumentException("Cannot merge with self.");
116         }
117         final Set<Follower> followers = new HashSet<Follower>(this.followers);
118         followers.addAll(f.getMerged());
119         return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
120     }
121 
122     @Override
123     public void messageReceived(final Message msg, final MessageDeliveryStatus status, final Follower source) {
124         if (this.isStopped()) {
125             throw new IllegalStateException("Follower already stopped.");
126         } else if (!this.getMerged().contains(source)) {
127             throw new IllegalArgumentException("Forbidden notification source: " + source);
128         }
129         LogUtil.newMessage(DefaultMergingFollower.LOGGER, Level.INFO, "New message received:", msg, status, source,
130                 this);
131         this.getExpectationManager().messageReceived(msg, status, source);
132         this.getConsumerManager().messageReceived(msg, status, this);
133     }
134 
135     @Override
136     public MergingFollower remove(final Follower f) {
137         if (!this.getMerged().contains(f)) {
138             return this;
139         } else if (this.getMerged().size() == 1) {
140             return null;
141         } else {
142             DefaultMergingFollower.LOGGER.info("Separating {} from {}.", f, this);
143             final List<Follower> followers = new ArrayList<Follower>(this.followers);
144             followers.remove(f);
145             return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
146         }
147     }
148 
149     @Override
150     public boolean stop() {
151         if (this.isStopped()) {
152             return false;
153         }
154         DefaultMergingFollower.LOGGER.info("Stopping {}.", this);
155         for (final Follower f : this.getMerged()) {
156             f.stop();
157         }
158         this.getConsumerManager().stop();
159         this.getExpectationManager().stop();
160         DefaultMergingFollower.LOGGER.info("Stopped {}.", this);
161         return true;
162     }
163 
164     @Override
165     public String toString() {
166         final StringBuilder builder = new StringBuilder();
167         builder.append("DefaultMergingFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
168         builder.append("getMerged()=").append(this.getMerged()).append(", ");
169         builder.append("isStopped()=").append(this.isStopped()).append("]");
170         return builder.toString();
171     }
172 
173     @Override
174     public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
175         final MessageComparator order, final MessageFormatter formatter) {
176         if (stream == null) {
177             throw new IllegalArgumentException("Stream may not be null.");
178         } else if (condition == null) {
179             throw new IllegalArgumentException("Condition may not be null.");
180         } else if (order == null) {
181             throw new IllegalArgumentException("Comparator may not be null.");
182         }
183         /*
184          * assemble messages per-follower, so that we can properly retrieve
185          * their source
186          */
187         final SortedSet<Message> messages = new TreeSet<Message>(order);
188         final Map<Message, String> messagesToText = new HashMap<Message, String>();
189         for (final Follower f : this.getMerged()) {
190             for (final Message m : f.getMessages(condition)) {
191                 messages.add(m);
192                 messagesToText.put(m, formatter.format(m, f.getFollowed().getWatchedFile()));
193             }
194         }
195         // and now write them in their original order
196         BufferedWriter w = null;
197         try {
198             w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
199             for (final Message msg : messages) {
200                 w.write(messagesToText.get(msg));
201                 w.newLine();
202             }
203             return true;
204         } catch (final IOException ex) {
205             return false;
206         } finally {
207             IOUtils.closeQuietly(w);
208         }
209     }
210 
211 }