1 package com.github.triceo.splitlog;
2
3 import java.io.BufferedWriter;
4 import java.io.IOException;
5 import java.io.OutputStream;
6 import java.io.OutputStreamWriter;
7 import java.util.ArrayList;
8 import java.util.Collection;
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.LinkedHashSet;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.SortedSet;
17 import java.util.TreeSet;
18
19 import org.apache.commons.io.IOUtils;
20 import org.slf4j.Logger;
21
22 import com.github.triceo.splitlog.api.Follower;
23 import com.github.triceo.splitlog.api.LogWatch;
24 import com.github.triceo.splitlog.api.MergingFollower;
25 import com.github.triceo.splitlog.api.Message;
26 import com.github.triceo.splitlog.api.MessageComparator;
27 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
28 import com.github.triceo.splitlog.api.MessageFormatter;
29 import com.github.triceo.splitlog.api.SimpleMessageCondition;
30 import com.github.triceo.splitlog.formatters.UnifyingMessageFormatter;
31 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
32 import com.github.triceo.splitlog.util.LogUtil;
33 import com.github.triceo.splitlog.util.LogUtil.Level;
34
35
36
37
38
39 final class DefaultMergingFollower extends AbstractCommonFollower<MergingFollower, Follower> implements MergingFollower {
40
41 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultMergingFollower.class);
42
43 private final ConsumerManager<MergingFollower> consumers = new ConsumerManager<MergingFollower>(this);
44 private final Set<Follower> followers = new LinkedHashSet<Follower>();
45
46 protected DefaultMergingFollower(final Follower... followers) {
47 DefaultMergingFollower.LOGGER.info("Merging followers into {}.", this);
48 final Set<LogWatch> watches = new HashSet<LogWatch>();
49 for (final Follower f : followers) {
50 final DefaultFollower af = (DefaultFollower) f;
51 this.followers.add(af);
52 af.registerConsumer(this);
53 watches.add(af.getFollowed());
54 }
55 if (watches.size() < this.followers.size()) {
56 DefaultMergingFollower.LOGGER.warn("Followers from the same LogWatch, possible message duplication.");
57 }
58 DefaultMergingFollower.LOGGER.info("Followers merged: {}.", this);
59 }
60
61 @Override
62 protected ConsumerManager<MergingFollower> getConsumerManager() {
63 return this.consumers;
64 }
65
66 @Override
67 protected MessageFormatter getDefaultFormatter() {
68 return UnifyingMessageFormatter.INSTANCE;
69 }
70
71 @Override
72 public Collection<Follower> getMerged() {
73 return Collections.unmodifiableSet(this.followers);
74 }
75
76 @Override
77 public SortedSet<Message> getMessages(final SimpleMessageCondition condition, final MessageComparator order) {
78 final SortedSet<Message> sorted = new TreeSet<Message>(order);
79 for (final Follower f : this.getMerged()) {
80 for (final Message m : f.getMessages()) {
81 if (!condition.accept(m)) {
82 continue;
83 }
84 sorted.add(m);
85 }
86 }
87 return Collections.unmodifiableSortedSet(sorted);
88 }
89
90 @Override
91 public synchronized boolean isStopped() {
92 for (final Follower f : this.followers) {
93 if (!f.isStopped()) {
94 return false;
95 }
96 }
97 return true;
98 }
99
100 @Override
101 public MergingFollower mergeWith(final Follower f) {
102 if (f == null) {
103 throw new IllegalArgumentException("Cannot merge with null.");
104 }
105 final Set<Follower> followers = new HashSet<Follower>(this.followers);
106 followers.add(f);
107 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
108 }
109
110 @Override
111 public MergingFollower mergeWith(final MergingFollower f) {
112 if (f == null) {
113 throw new IllegalArgumentException("Cannot merge with null.");
114 } else if (f == this) {
115 throw new IllegalArgumentException("Cannot merge with self.");
116 }
117 final Set<Follower> followers = new HashSet<Follower>(this.followers);
118 followers.addAll(f.getMerged());
119 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
120 }
121
122 @Override
123 public void messageReceived(final Message msg, final MessageDeliveryStatus status, final Follower source) {
124 if (this.isStopped()) {
125 throw new IllegalStateException("Follower already stopped.");
126 } else if (!this.getMerged().contains(source)) {
127 throw new IllegalArgumentException("Forbidden notification source: " + source);
128 }
129 LogUtil.newMessage(DefaultMergingFollower.LOGGER, Level.INFO, "New message received:", msg, status, source,
130 this);
131 this.getExpectationManager().messageReceived(msg, status, source);
132 this.getConsumerManager().messageReceived(msg, status, this);
133 }
134
135 @Override
136 public MergingFollower remove(final Follower f) {
137 if (!this.getMerged().contains(f)) {
138 return this;
139 } else if (this.getMerged().size() == 1) {
140 return null;
141 } else {
142 DefaultMergingFollower.LOGGER.info("Separating {} from {}.", f, this);
143 final List<Follower> followers = new ArrayList<Follower>(this.followers);
144 followers.remove(f);
145 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
146 }
147 }
148
149 @Override
150 public boolean stop() {
151 if (this.isStopped()) {
152 return false;
153 }
154 DefaultMergingFollower.LOGGER.info("Stopping {}.", this);
155 for (final Follower f : this.getMerged()) {
156 f.stop();
157 }
158 this.getConsumerManager().stop();
159 this.getExpectationManager().stop();
160 DefaultMergingFollower.LOGGER.info("Stopped {}.", this);
161 return true;
162 }
163
164 @Override
165 public String toString() {
166 final StringBuilder builder = new StringBuilder();
167 builder.append("DefaultMergingFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
168 builder.append("getMerged()=").append(this.getMerged()).append(", ");
169 builder.append("isStopped()=").append(this.isStopped()).append("]");
170 return builder.toString();
171 }
172
173 @Override
174 public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
175 final MessageComparator order, final MessageFormatter formatter) {
176 if (stream == null) {
177 throw new IllegalArgumentException("Stream may not be null.");
178 } else if (condition == null) {
179 throw new IllegalArgumentException("Condition may not be null.");
180 } else if (order == null) {
181 throw new IllegalArgumentException("Comparator may not be null.");
182 }
183
184
185
186
187 final SortedSet<Message> messages = new TreeSet<Message>(order);
188 final Map<Message, String> messagesToText = new HashMap<Message, String>();
189 for (final Follower f : this.getMerged()) {
190 for (final Message m : f.getMessages(condition)) {
191 messages.add(m);
192 messagesToText.put(m, formatter.format(m, f.getFollowed().getWatchedFile()));
193 }
194 }
195
196 BufferedWriter w = null;
197 try {
198 w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
199 for (final Message msg : messages) {
200 w.write(messagesToText.get(msg));
201 w.newLine();
202 }
203 return true;
204 } catch (final IOException ex) {
205 return false;
206 } finally {
207 IOUtils.closeQuietly(w);
208 }
209 }
210
211 }