1 package com.github.triceo.splitlog;
2
3 import java.util.Collections;
4 import java.util.SortedSet;
5 import java.util.TreeSet;
6
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import com.github.triceo.splitlog.api.Follower;
11 import com.github.triceo.splitlog.api.MergingFollower;
12 import com.github.triceo.splitlog.api.Message;
13 import com.github.triceo.splitlog.api.MessageComparator;
14 import com.github.triceo.splitlog.api.MessageConsumer;
15 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
16 import com.github.triceo.splitlog.api.MessageListener;
17 import com.github.triceo.splitlog.api.SimpleMessageCondition;
18
19 final class NonStoringMergingFollower extends AbstractMergingFollower {
20
21 private static final Logger LOGGER = LoggerFactory.getLogger(NonStoringFollower.class);
22
23 private final ConsumerManager<MergingFollower> consumers = new ConsumerManager<MergingFollower>(this);
24
25 public NonStoringMergingFollower(final Follower... followers) {
26 super(followers);
27 }
28
29 @Override
30 public int countConsumers() {
31 return this.consumers.countConsumers();
32 }
33
34 @Override
35 public SortedSet<Message> getMessages(final SimpleMessageCondition condition, final MessageComparator order) {
36 final SortedSet<Message> sorted = new TreeSet<Message>(order);
37 for (final Follower f : this.getMerged()) {
38 for (final Message m : f.getMessages()) {
39 if (!condition.accept(m)) {
40 continue;
41 }
42 sorted.add(m);
43 }
44 }
45 return Collections.unmodifiableSortedSet(sorted);
46 }
47
48 @Override
49 public boolean isConsuming(final MessageConsumer<MergingFollower> consumer) {
50 return this.consumers.isConsuming(consumer);
51 }
52
53 @Override
54 public void messageReceived(final Message msg, final MessageDeliveryStatus status, final Follower source) {
55 if (this.isStopped()) {
56 throw new IllegalStateException("Follower already stopped.");
57 } else if (!this.getMerged().contains(source)) {
58 throw new IllegalArgumentException("Forbidden notification source: " + source);
59 }
60 NonStoringMergingFollower.LOGGER.info("{} notified of '{}' with status {} by {}.", this, msg, status, source);
61 this.getExchange().messageReceived(msg, status, source);
62 this.consumers.messageReceived(msg, status, this);
63 }
64
65 @Override
66 public MessageConsumer<MergingFollower> startConsuming(final MessageListener<MergingFollower> consumer) {
67 return this.consumers.startConsuming(consumer);
68 }
69
70 @Override
71 public boolean stopConsuming(final MessageConsumer<MergingFollower> consumer) {
72 return this.consumers.stopConsuming(consumer);
73 }
74
75 @Override
76 public String toString() {
77 final StringBuilder builder = new StringBuilder();
78 builder.append("NonStoringMergingFollower [");
79 if (this.getMerged() != null) {
80 builder.append("getMerged()=").append(this.getMerged()).append(", ");
81 }
82 builder.append("isStopped()=").append(this.isStopped()).append("]");
83 return builder.toString();
84 }
85
86 }