1 package com.github.triceo.splitlog.expectations;
2
3 import java.util.concurrent.ConcurrentHashMap;
4 import java.util.concurrent.ConcurrentMap;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7 import java.util.concurrent.Future;
8 import java.util.concurrent.atomic.AtomicBoolean;
9
10 import org.slf4j.Logger;
11
12 import com.github.triceo.splitlog.api.Message;
13 import com.github.triceo.splitlog.api.MessageAction;
14 import com.github.triceo.splitlog.api.MessageConsumer;
15 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
16 import com.github.triceo.splitlog.api.MessageProducer;
17 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
18 import com.github.triceo.splitlog.util.LogUtil;
19 import com.github.triceo.splitlog.util.LogUtil.Level;
20 import com.github.triceo.splitlog.util.SplitlogThreadFactory;
21
22
23
24
25
26
27
28
29
30 abstract class AbstractExpectationManager<P extends MessageProducer<P>, C> implements MessageConsumer<P> {
31
32 private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new SplitlogThreadFactory(
33 "expectations"));
34 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(AbstractExpectationManager.class);
35 private final ConcurrentMap<AbstractExpectation<C, P>, Future<Message>> expectations = new ConcurrentHashMap<AbstractExpectation<C, P>, Future<Message>>();
36
37 private final AtomicBoolean isStopped = new AtomicBoolean(false);
38
39 protected abstract AbstractExpectation<C, P> createExpectation(final C condition, final MessageAction<P> action);
40
41 @Override
42 public boolean isStopped() {
43 return this.isStopped.get();
44 }
45
46 @Override
47 public synchronized void messageReceived(final Message msg, final MessageDeliveryStatus status, final P producer) {
48 if (this.isStopped()) {
49 throw new IllegalStateException("Already stopped.");
50 }
51 LogUtil.newMessage(AbstractExpectationManager.LOGGER, Level.INFO, "New message received:", msg, status,
52 producer, this);
53 for (final AbstractExpectation<C, P> exchange : this.expectations.keySet()) {
54 exchange.messageReceived(msg, status, producer);
55 }
56 }
57
58
59
60
61
62
63
64
65
66 public synchronized Future<Message> setExpectation(final C condition) {
67 return this.setExpectation(condition, null);
68 }
69
70
71
72
73
74
75
76
77
78 public synchronized Future<Message> setExpectation(final C condition, final MessageAction<P> action) {
79 if (this.isStopped()) {
80 throw new IllegalStateException("Already stopped.");
81 }
82 final AbstractExpectation<C, P> expectation = this.createExpectation(condition, action);
83 final Future<Message> future = AbstractExpectationManager.EXECUTOR.submit(expectation);
84 this.expectations.put(expectation, future);
85 AbstractExpectationManager.LOGGER.info("Registered expectation {} with action {}.", expectation, action);
86 return future;
87 }
88
89 @Override
90 public boolean stop() {
91 if (!this.isStopped.compareAndSet(false, true)) {
92
93 return false;
94 }
95 for (final Future<Message> future : this.expectations.values()) {
96 future.cancel(true);
97 }
98 this.expectations.clear();
99 return true;
100 }
101
102
103
104
105
106
107
108
109 protected synchronized boolean unsetExpectation(final AbstractExpectation<C, P> expectation) {
110 if (this.expectations.containsKey(expectation)) {
111 this.expectations.remove(expectation);
112 AbstractExpectationManager.LOGGER.info("Unregistered expectation {}.", expectation);
113 return true;
114 }
115 return false;
116 }
117
118 }