View Javadoc
1   package com.github.triceo.splitlog.expectations;
2   
3   import java.util.concurrent.ConcurrentHashMap;
4   import java.util.concurrent.ConcurrentMap;
5   import java.util.concurrent.ExecutorService;
6   import java.util.concurrent.Executors;
7   import java.util.concurrent.Future;
8   import java.util.concurrent.atomic.AtomicBoolean;
9   
10  import org.slf4j.Logger;
11  
12  import com.github.triceo.splitlog.api.Message;
13  import com.github.triceo.splitlog.api.MessageAction;
14  import com.github.triceo.splitlog.api.MessageConsumer;
15  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
16  import com.github.triceo.splitlog.api.MessageProducer;
17  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
18  import com.github.triceo.splitlog.util.LogUtil;
19  import com.github.triceo.splitlog.util.LogUtil.Level;
20  import com.github.triceo.splitlog.util.SplitlogThreadFactory;
21  
22  /**
23   * Tracks expectations that are currently active for a given message consumer.
24   *
25   * @param <P>
26   *            The source for the messages.
27   * @param <C>
28   *            The type of condition that the expectations accept.
29   */
30  abstract class AbstractExpectationManager<P extends MessageProducer<P>, C> implements MessageConsumer<P> {
31  
32      private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new SplitlogThreadFactory(
33              "expectations"));
34      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(AbstractExpectationManager.class);
35      private final ConcurrentMap<AbstractExpectation<C, P>, Future<Message>> expectations = new ConcurrentHashMap<AbstractExpectation<C, P>, Future<Message>>();
36  
37      private final AtomicBoolean isStopped = new AtomicBoolean(false);
38  
39      protected abstract AbstractExpectation<C, P> createExpectation(final C condition, final MessageAction<P> action);
40  
41      @Override
42      public boolean isStopped() {
43          return this.isStopped.get();
44      }
45  
46      @Override
47      public synchronized void messageReceived(final Message msg, final MessageDeliveryStatus status, final P producer) {
48          if (this.isStopped()) {
49              throw new IllegalStateException("Already stopped.");
50          }
51          LogUtil.newMessage(AbstractExpectationManager.LOGGER, Level.INFO, "New message received:", msg, status,
52                  producer, this);
53          for (final AbstractExpectation<C, P> exchange : this.expectations.keySet()) {
54              exchange.messageReceived(msg, status, producer);
55          }
56      }
57  
58      /**
59       * The resulting future will only return after such a message is received
60       * that makes the condition true.
61       *
62       * @param condition
63       *            Condition to be true.
64       * @return The future.
65       */
66      public synchronized Future<Message> setExpectation(final C condition) {
67          return this.setExpectation(condition, null);
68      }
69  
70      /**
71       * The resulting future will only return after such a message is received
72       * that makes the condition true.
73       *
74       * @param condition
75       *            Condition to be true.
76       * @return The future.
77       */
78      public synchronized Future<Message> setExpectation(final C condition, final MessageAction<P> action) {
79          if (this.isStopped()) {
80              throw new IllegalStateException("Already stopped.");
81          }
82          final AbstractExpectation<C, P> expectation = this.createExpectation(condition, action);
83          final Future<Message> future = AbstractExpectationManager.EXECUTOR.submit(expectation);
84          this.expectations.put(expectation, future);
85          AbstractExpectationManager.LOGGER.info("Registered expectation {} with action {}.", expectation, action);
86          return future;
87      }
88  
89      @Override
90      public boolean stop() {
91          if (!this.isStopped.compareAndSet(false, true)) {
92              // already stopped
93              return false;
94          }
95          for (final Future<Message> future : this.expectations.values()) {
96              future.cancel(true);
97          }
98          this.expectations.clear();
99          return true;
100     }
101 
102     /**
103      * Stop tracking this expectation. Calls from the internal code only.
104      *
105      * @param expectation
106      *            The expectation to stop.
107      * @return If stopped, false if stopped already.
108      */
109     protected synchronized boolean unsetExpectation(final AbstractExpectation<C, P> expectation) {
110         if (this.expectations.containsKey(expectation)) {
111             this.expectations.remove(expectation);
112             AbstractExpectationManager.LOGGER.info("Unregistered expectation {}.", expectation);
113             return true;
114         }
115         return false;
116     }
117 
118 }