1 package com.github.triceo.splitlog;
2
3 import java.io.BufferedWriter;
4 import java.io.IOException;
5 import java.io.OutputStream;
6 import java.io.OutputStreamWriter;
7 import java.util.Collections;
8 import java.util.HashSet;
9 import java.util.LinkedHashSet;
10 import java.util.List;
11 import java.util.Set;
12 import java.util.SortedSet;
13 import java.util.TreeSet;
14
15 import org.apache.commons.io.IOUtils;
16 import org.apache.commons.lang3.tuple.Pair;
17 import org.slf4j.Logger;
18
19 import com.github.triceo.splitlog.api.Follower;
20 import com.github.triceo.splitlog.api.LogWatch;
21 import com.github.triceo.splitlog.api.MergingFollower;
22 import com.github.triceo.splitlog.api.Message;
23 import com.github.triceo.splitlog.api.MessageComparator;
24 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
25 import com.github.triceo.splitlog.api.MessageFormatter;
26 import com.github.triceo.splitlog.api.MessageMeasure;
27 import com.github.triceo.splitlog.api.SimpleMessageCondition;
28 import com.github.triceo.splitlog.formatters.NoopMessageFormatter;
29 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 final class DefaultFollower extends AbstractCommonFollower<Follower, LogWatch> implements Follower {
47
48 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultFollower.class);
49
50 private final ConsumerManager<Follower> consumers = new ConsumerManager<Follower>(this);
51 private final Set<Message> tags = new LinkedHashSet<Message>();
52 private final DefaultLogWatch watch;
53
54 public DefaultFollower(final DefaultLogWatch watch,
55 final List<Pair<String, MessageMeasure<? extends Number, Follower>>> measuresHandedDown) {
56 for (final Pair<String, MessageMeasure<? extends Number, Follower>> pair : measuresHandedDown) {
57 this.startMeasuring(pair.getValue(), pair.getKey(), false);
58 }
59 this.watch = watch;
60 }
61
62 @Override
63 protected ConsumerManager<Follower> getConsumerManager() {
64 return this.consumers;
65 }
66
67 @Override
68 protected MessageFormatter getDefaultFormatter() {
69 return NoopMessageFormatter.INSTANCE;
70 }
71
72 @Override
73 public LogWatch getFollowed() {
74 return this.getWatch();
75 }
76
77
78 @Override
79 public SortedSet<Message> getMessages(final SimpleMessageCondition condition, final MessageComparator order) {
80 final SortedSet<Message> messages = new TreeSet<Message>(order);
81 for (final Message msg : this.getWatch().getAllMessages(this)) {
82 if (!condition.accept(msg)) {
83 continue;
84 }
85 messages.add(msg);
86 }
87 messages.addAll(this.getTags());
88 return Collections.unmodifiableSortedSet(messages);
89 }
90
91 protected Set<Message> getTags() {
92 return this.tags;
93 }
94
95 protected DefaultLogWatch getWatch() {
96 return this.watch;
97 }
98
99 @Override
100 public boolean isStopped() {
101 return !this.getFollowed().isFollowedBy(this);
102 }
103
104 @Override
105 public MergingFollower mergeWith(final Follower f) {
106 if (f == null) {
107 throw new IllegalArgumentException("Cannot merge with null.");
108 } else if (f == this) {
109 throw new IllegalArgumentException("Cannot merge with self.");
110 }
111 return new DefaultMergingFollower(this, f);
112 }
113
114 @Override
115 public MergingFollower mergeWith(final MergingFollower f) {
116 if (f == null) {
117 throw new IllegalArgumentException("Cannot merge with null.");
118 }
119 final Set<Follower> followers = new HashSet<Follower>(f.getMerged());
120 followers.add(this);
121 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
122 }
123
124 @Override
125 public synchronized void messageReceived(final Message msg, final MessageDeliveryStatus status,
126 final LogWatch source) {
127 if (this.isStopped()) {
128 throw new IllegalStateException("Follower already stopped.");
129 } else if (source != this.getWatch()) {
130 throw new IllegalArgumentException("Forbidden notification source: " + source);
131 }
132 DefaultFollower.LOGGER.info("{} notified of '{}' with status {}.", this, msg, status);
133 this.getExchange().messageReceived(msg, status, source);
134 this.getConsumerManager().messageReceived(msg, status, this);
135 }
136
137 @Override
138 public synchronized boolean stop() {
139 if (this.isStopped()) {
140 return false;
141 }
142 DefaultFollower.LOGGER.info("Stopping {}.", this);
143 this.getFollowed().stopFollowing(this);
144 this.getConsumerManager().stop();
145 this.getExchange().stop();
146 DefaultFollower.LOGGER.info("Stopped {}.", this);
147 return true;
148 }
149
150 @Override
151 public Message tag(final String tagLine) {
152 final Message message = new MessageBuilder(tagLine).buildTag();
153 this.tags.add(message);
154 return message;
155 }
156
157 @Override
158 public String toString() {
159 final StringBuilder builder = new StringBuilder();
160 builder.append("DefaultFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
161 if (this.getFollowed() != null) {
162 builder.append("getFollowed()=").append(this.getFollowed()).append(", ");
163 }
164 builder.append("isStopped()=").append(this.isStopped()).append("]");
165 return builder.toString();
166 }
167
168 @Override
169 public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
170 final MessageComparator order, final MessageFormatter formatter) {
171 if (stream == null) {
172 throw new IllegalArgumentException("Stream may not be null.");
173 } else if (condition == null) {
174 throw new IllegalArgumentException("Condition may not be null.");
175 } else if (order == null) {
176 throw new IllegalArgumentException("Comparator may not be null.");
177 }
178 BufferedWriter w = null;
179 try {
180 w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
181 for (final Message msg : this.getMessages(condition, order)) {
182 w.write(formatter.format(msg, this.getFollowed().getWatchedFile()));
183 w.newLine();
184 }
185 return true;
186 } catch (final IOException ex) {
187 return false;
188 } finally {
189 IOUtils.closeQuietly(w);
190 }
191 }
192
193 }