1 package com.github.triceo.splitlog;
2
3 import java.io.BufferedWriter;
4 import java.io.IOException;
5 import java.io.OutputStream;
6 import java.io.OutputStreamWriter;
7 import java.util.ArrayList;
8 import java.util.Collection;
9 import java.util.Collections;
10 import java.util.HashMap;
11 import java.util.HashSet;
12 import java.util.LinkedHashSet;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.SortedSet;
17 import java.util.TreeSet;
18
19 import org.apache.commons.io.IOUtils;
20 import org.slf4j.Logger;
21
22 import com.github.triceo.splitlog.api.Follower;
23 import com.github.triceo.splitlog.api.LogWatch;
24 import com.github.triceo.splitlog.api.MergingFollower;
25 import com.github.triceo.splitlog.api.Message;
26 import com.github.triceo.splitlog.api.MessageComparator;
27 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
28 import com.github.triceo.splitlog.api.MessageFormatter;
29 import com.github.triceo.splitlog.api.SimpleMessageCondition;
30 import com.github.triceo.splitlog.formatters.UnifyingMessageFormatter;
31 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
32
33
34
35
36
37 final class DefaultMergingFollower extends AbstractCommonFollower<MergingFollower, Follower> implements MergingFollower {
38
39 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultMergingFollower.class);
40
41 private final ConsumerManager<MergingFollower> consumers = new ConsumerManager<MergingFollower>(this);
42 private final Set<Follower> followers = new LinkedHashSet<Follower>();
43
44 protected DefaultMergingFollower(final Follower... followers) {
45 DefaultMergingFollower.LOGGER.info("Merging followers into {}.", this);
46 final Set<LogWatch> watches = new HashSet<LogWatch>();
47 for (final Follower f : followers) {
48 final DefaultFollower af = (DefaultFollower) f;
49 this.followers.add(af);
50 af.registerConsumer(this);
51 watches.add(af.getFollowed());
52 }
53 if (watches.size() < this.followers.size()) {
54 DefaultMergingFollower.LOGGER.warn("Followers from the same LogWatch, possible message duplication.");
55 }
56 DefaultMergingFollower.LOGGER.info("Followers merged: {}.", this);
57 }
58
59 @Override
60 protected ConsumerManager<MergingFollower> getConsumerManager() {
61 return this.consumers;
62 }
63
64 @Override
65 protected MessageFormatter getDefaultFormatter() {
66 return UnifyingMessageFormatter.INSTANCE;
67 }
68
69 @Override
70 public Collection<Follower> getMerged() {
71 return Collections.unmodifiableSet(this.followers);
72 }
73
74 @Override
75 public synchronized SortedSet<Message> getMessages(final SimpleMessageCondition condition,
76 final MessageComparator order) {
77 final SortedSet<Message> sorted = new TreeSet<Message>(order);
78 for (final Follower f : this.getMerged()) {
79 for (final Message m : f.getMessages()) {
80 if (!condition.accept(m)) {
81 continue;
82 }
83 sorted.add(m);
84 }
85 }
86 return Collections.unmodifiableSortedSet(sorted);
87 }
88
89 @Override
90 public synchronized boolean isStopped() {
91 for (final Follower f : this.followers) {
92 if (!f.isStopped()) {
93 return false;
94 }
95 }
96 return true;
97 }
98
99 @Override
100 public MergingFollower mergeWith(final Follower f) {
101 if (f == null) {
102 throw new IllegalArgumentException("Cannot merge with null.");
103 }
104 final Set<Follower> followers = new HashSet<Follower>(this.followers);
105 followers.add(f);
106 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
107 }
108
109 @Override
110 public MergingFollower mergeWith(final MergingFollower f) {
111 if (f == null) {
112 throw new IllegalArgumentException("Cannot merge with null.");
113 } else if (f == this) {
114 throw new IllegalArgumentException("Cannot merge with self.");
115 }
116 final Set<Follower> followers = new HashSet<Follower>(this.followers);
117 followers.addAll(f.getMerged());
118 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
119 }
120
121 @Override
122 public synchronized void messageReceived(final Message msg, final MessageDeliveryStatus status,
123 final Follower source) {
124 if (this.isStopped()) {
125 throw new IllegalStateException("Follower already stopped.");
126 } else if (!this.getMerged().contains(source)) {
127 throw new IllegalArgumentException("Forbidden notification source: " + source);
128 }
129 DefaultMergingFollower.LOGGER.info("{} notified of '{}' with status {} by {}.", this, msg, status, source);
130 this.getExpectationManager().messageReceived(msg, status, source);
131 this.getConsumerManager().messageReceived(msg, status, this);
132 }
133
134 @Override
135 public MergingFollower remove(final Follower f) {
136 if (!this.getMerged().contains(f)) {
137 return this;
138 } else if (this.getMerged().size() == 1) {
139 return null;
140 } else {
141 DefaultMergingFollower.LOGGER.info("Separating {} from {}.", f, this);
142 final List<Follower> followers = new ArrayList<Follower>(this.followers);
143 followers.remove(f);
144 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
145 }
146 }
147
148 @Override
149 public boolean separate(final Follower f) {
150 if (!this.followers.remove(f)) {
151 return false;
152 }
153
154 DefaultMergingFollower.LOGGER.info("Separating {} from {}.", f, this);
155 return f.stopConsuming(this);
156 }
157
158 @Override
159 public synchronized boolean stop() {
160 if (this.isStopped()) {
161 return false;
162 }
163 DefaultMergingFollower.LOGGER.info("Stopping {}.", this);
164 for (final Follower f : this.getMerged()) {
165 f.stop();
166 }
167 this.getConsumerManager().stop();
168 this.getExpectationManager().stop();
169 DefaultMergingFollower.LOGGER.info("Stopped {}.", this);
170 return true;
171 }
172
173 @Override
174 public String toString() {
175 final StringBuilder builder = new StringBuilder();
176 builder.append("DefaultMergingFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
177 if (this.getMerged() != null) {
178 builder.append("getMerged()=").append(this.getMerged()).append(", ");
179 }
180 builder.append("isStopped()=").append(this.isStopped()).append("]");
181 return builder.toString();
182 }
183
184 @Override
185 public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
186 final MessageComparator order, final MessageFormatter formatter) {
187 if (stream == null) {
188 throw new IllegalArgumentException("Stream may not be null.");
189 } else if (condition == null) {
190 throw new IllegalArgumentException("Condition may not be null.");
191 } else if (order == null) {
192 throw new IllegalArgumentException("Comparator may not be null.");
193 }
194
195
196
197
198 final SortedSet<Message> messages = new TreeSet<Message>(order);
199 final Map<Message, String> messagesToText = new HashMap<Message, String>();
200 for (final Follower f : this.getMerged()) {
201 for (final Message m : f.getMessages(condition)) {
202 messages.add(m);
203 messagesToText.put(m, formatter.format(m, f.getFollowed().getWatchedFile()));
204 }
205 }
206
207 BufferedWriter w = null;
208 try {
209 w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
210 for (final Message msg : messages) {
211 w.write(messagesToText.get(msg));
212 w.newLine();
213 }
214 return true;
215 } catch (final IOException ex) {
216 return false;
217 } finally {
218 IOUtils.closeQuietly(w);
219 }
220 }
221
222 }