1 package com.github.triceo.splitlog.expectations;
2
3 import java.util.HashSet;
4 import java.util.Set;
5 import java.util.concurrent.ExecutorService;
6 import java.util.concurrent.Executors;
7 import java.util.concurrent.Future;
8 import java.util.concurrent.ThreadFactory;
9 import java.util.concurrent.atomic.AtomicLong;
10
11 import com.github.triceo.splitlog.api.Message;
12 import com.github.triceo.splitlog.api.MessageAction;
13 import com.github.triceo.splitlog.api.MessageConsumer;
14 import com.github.triceo.splitlog.api.MessageDeliveryStatus;
15 import com.github.triceo.splitlog.api.MessageProducer;
16
17
18
19
20
21
22
23
24
25 abstract class AbstractExpectationManager<P extends MessageProducer<P>, C> implements MessageConsumer<P> {
26
27 private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new ThreadFactory() {
28
29 private final ThreadGroup group = new ThreadGroup("expectations");
30 private final AtomicLong nextId = new AtomicLong(0);
31
32 @Override
33 public Thread newThread(final Runnable r) {
34 return new Thread(this.group, r, this.group.getName() + "-" + this.nextId.incrementAndGet());
35 }
36
37 });
38 private final Set<AbstractExpectation<C, P>> exchanges = new HashSet<AbstractExpectation<C, P>>();
39 private boolean isStopped = false;
40
41 protected abstract AbstractExpectation<C, P> createExpectation(final C condition, final MessageAction<P> action);
42
43 @Override
44 public boolean isStopped() {
45 return this.isStopped;
46 }
47
48 @Override
49 public synchronized void
50 messageReceived(final Message message, final MessageDeliveryStatus status, final P producer) {
51 if (this.isStopped()) {
52 throw new IllegalStateException("Already stopped.");
53 }
54 for (final AbstractExpectation<C, P> exchange : this.exchanges) {
55 exchange.messageReceived(message, status, producer);
56 }
57 }
58
59
60
61
62
63
64
65
66
67 public synchronized Future<Message> setExpectation(final C condition) {
68 return this.setExpectation(condition, null);
69 }
70
71
72
73
74
75
76
77
78
79 public synchronized Future<Message> setExpectation(final C condition, final MessageAction<P> action) {
80 if (this.isStopped()) {
81 throw new IllegalStateException("Already stopped.");
82 }
83 final AbstractExpectation<C, P> exchange = this.createExpectation(condition, action);
84 this.exchanges.add(exchange);
85 return AbstractExpectationManager.EXECUTOR.submit(exchange);
86 }
87
88 @Override
89 public boolean stop() {
90 if (this.isStopped()) {
91 return false;
92 }
93 this.exchanges.clear();
94 this.isStopped = true;
95 return true;
96 }
97
98
99
100
101
102
103
104
105 protected synchronized boolean unsetExpectation(final AbstractExpectation<C, P> expectation) {
106 return this.exchanges.remove(expectation);
107 }
108
109 }