1 package com.github.triceo.splitlog;
2
3 import it.unimi.dsi.fastutil.objects.ObjectRBTreeSet;
4 import it.unimi.dsi.fastutil.objects.ObjectSortedSet;
5
6 import java.io.BufferedWriter;
7 import java.io.IOException;
8 import java.io.OutputStream;
9 import java.io.OutputStreamWriter;
10 import java.util.Collections;
11 import java.util.HashSet;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.SortedSet;
15 import java.util.TreeSet;
16 import java.util.concurrent.atomic.AtomicBoolean;
17
18 import org.apache.commons.io.IOUtils;
19 import org.apache.commons.lang3.tuple.Pair;
20 import org.slf4j.Logger;
21
22 import com.github.triceo.splitlog.api.Follower;
23 import com.github.triceo.splitlog.api.LogWatch;
24 import com.github.triceo.splitlog.api.MergingFollower;
25 import com.github.triceo.splitlog.api.Message;
26 import com.github.triceo.splitlog.api.MessageComparator;
27 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
28 import com.github.triceo.splitlog.api.MessageFormatter;
29 import com.github.triceo.splitlog.api.MessageMeasure;
30 import com.github.triceo.splitlog.api.SimpleMessageCondition;
31 import com.github.triceo.splitlog.formatters.NoopMessageFormatter;
32 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
33 import com.github.triceo.splitlog.util.LogUtil;
34 import com.github.triceo.splitlog.util.LogUtil.Level;
35
36
37
38
39
40
41
42
43
44
45
46 final class DefaultFollower extends AbstractCommonFollower<Follower, LogWatch> implements Follower {
47
48 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(DefaultFollower.class);
49
50 private final ConsumerManager<Follower> consumers = new ConsumerManager<Follower>(this);
51 private final AtomicBoolean isStopped = new AtomicBoolean(false);
52 private final SortedSet<Message> tags = new TreeSet<Message>();
53
54 private final DefaultLogWatch watch;
55
56 public DefaultFollower(final DefaultLogWatch watch,
57 final List<Pair<String, MessageMeasure<? extends Number, Follower>>> measuresHandedDown) {
58 for (final Pair<String, MessageMeasure<? extends Number, Follower>> pair : measuresHandedDown) {
59 this.startMeasuring(pair.getValue(), pair.getKey());
60 }
61 this.watch = watch;
62 }
63
64 @Override
65 protected ConsumerManager<Follower> getConsumerManager() {
66 return this.consumers;
67 }
68
69 @Override
70 protected MessageFormatter getDefaultFormatter() {
71 return NoopMessageFormatter.INSTANCE;
72 }
73
74 @Override
75 public LogWatch getFollowed() {
76 return this.getWatch();
77 }
78
79 @Override
80 public SortedSet<Message> getMessages(final SimpleMessageCondition condition,
81 final MessageComparator order) {
82 final ObjectSortedSet<Message> messages = new ObjectRBTreeSet<Message>(order);
83 for (final Message msg : this.getWatch().getAllMessages(this)) {
84 if (!condition.accept(msg)) {
85 continue;
86 }
87 messages.add(msg);
88 }
89 messages.addAll(this.tags);
90 return Collections.unmodifiableSortedSet(messages);
91 }
92
93 protected DefaultLogWatch getWatch() {
94 return this.watch;
95 }
96
97 @Override
98 public synchronized boolean isStopped() {
99 if (this.isStopped.get()) {
100 return true;
101 } else if (this.getFollowed().isFollowedBy(this)) {
102 return false;
103 } else {
104 this.isStopped.set(true);
105 return true;
106 }
107 }
108
109 @Override
110 public MergingFollower mergeWith(final Follower f) {
111 if (f == null) {
112 throw new IllegalArgumentException("Cannot merge with null.");
113 } else if (f == this) {
114 throw new IllegalArgumentException("Cannot merge with self.");
115 }
116 return new DefaultMergingFollower(this, f);
117 }
118
119 @Override
120 public MergingFollower mergeWith(final MergingFollower f) {
121 if (f == null) {
122 throw new IllegalArgumentException("Cannot merge with null.");
123 }
124 final Set<Follower> followers = new HashSet<Follower>(f.getMerged());
125 followers.add(this);
126 return new DefaultMergingFollower(followers.toArray(new Follower[followers.size()]));
127 }
128
129 @Override
130 public void messageReceived(final Message msg, final MessageDeliveryStatus status,
131 final LogWatch source) {
132 if (this.isStopped()) {
133 throw new IllegalStateException("Follower already stopped.");
134 } else if (source != this.getWatch()) {
135 throw new IllegalArgumentException("Forbidden notification source: " + source);
136 }
137 LogUtil.newMessage(DefaultFollower.LOGGER, Level.INFO, "New message received:", msg, status, source, this);
138 this.getExpectationManager().messageReceived(msg, status, source);
139 this.getConsumerManager().messageReceived(msg, status, this);
140 }
141
142 @Override
143 public boolean stop() {
144 if (this.isStopped()) {
145 return false;
146 }
147 DefaultFollower.LOGGER.info("Stopping {}.", this);
148 this.getFollowed().stopFollowing(this);
149 this.getConsumerManager().stop();
150 this.getExpectationManager().stop();
151 DefaultFollower.LOGGER.info("Stopped {}.", this);
152 return true;
153 }
154
155 @Override
156 public Message tag(final String tagLine) {
157 final Message message = new MessageBuilder(tagLine).buildTag();
158 this.tags.add(message);
159 return message;
160 }
161
162 @Override
163 public String toString() {
164 final StringBuilder builder = new StringBuilder();
165 builder.append("DefaultFollower [getUniqueId()=").append(this.getUniqueId()).append(", ");
166 if (this.getFollowed() != null) {
167 builder.append("getFollowed()=").append(this.getFollowed()).append(", ");
168 }
169 builder.append("isStopped()=").append(this.isStopped()).append("]");
170 return builder.toString();
171 }
172
173 @Override
174 public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
175 final MessageComparator order, final MessageFormatter formatter) {
176 if (stream == null) {
177 throw new IllegalArgumentException("Stream may not be null.");
178 } else if (condition == null) {
179 throw new IllegalArgumentException("Condition may not be null.");
180 } else if (order == null) {
181 throw new IllegalArgumentException("Comparator may not be null.");
182 }
183 BufferedWriter w = null;
184 try {
185 w = new BufferedWriter(new OutputStreamWriter(stream, "UTF-8"));
186 for (final Message msg : this.getMessages(condition, order)) {
187 w.write(formatter.format(msg, this.getFollowed().getWatchedFile()));
188 w.newLine();
189 }
190 return true;
191 } catch (final IOException ex) {
192 return false;
193 } finally {
194 IOUtils.closeQuietly(w);
195 }
196 }
197
198 }