View Javadoc

1   package com.github.triceo.splitlog.expectations;
2   
3   import java.util.HashSet;
4   import java.util.Set;
5   import java.util.concurrent.ExecutorService;
6   import java.util.concurrent.Executors;
7   import java.util.concurrent.Future;
8   import java.util.concurrent.ThreadFactory;
9   import java.util.concurrent.atomic.AtomicLong;
10  
11  import com.github.triceo.splitlog.api.Message;
12  import com.github.triceo.splitlog.api.MessageAction;
13  import com.github.triceo.splitlog.api.MessageConsumer;
14  import com.github.triceo.splitlog.api.MessageDeliveryStatus;
15  import com.github.triceo.splitlog.api.MessageProducer;
16  
17  /**
18   * Tracks expectations that are currently active for a given message consumer.
19   *
20   * @param <P>
21   *            The source for the messages.
22   * @param <C>
23   *            The type of condition that the expectations accept.
24   */
25  abstract class AbstractExpectationManager<P extends MessageProducer<P>, C> implements MessageConsumer<P> {
26  
27      private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new ThreadFactory() {
28  
29          private final ThreadGroup group = new ThreadGroup("expectations");
30          private final AtomicLong nextId = new AtomicLong(0);
31  
32          @Override
33          public Thread newThread(final Runnable r) {
34              return new Thread(this.group, r, this.group.getName() + "-" + this.nextId.incrementAndGet());
35          }
36  
37      });
38      private final Set<AbstractExpectation<C, P>> exchanges = new HashSet<AbstractExpectation<C, P>>();
39      private boolean isStopped = false;
40  
41      protected abstract AbstractExpectation<C, P> createExpectation(final C condition, final MessageAction<P> action);
42  
43      @Override
44      public boolean isStopped() {
45          return this.isStopped;
46      }
47  
48      @Override
49      public synchronized void
50      messageReceived(final Message message, final MessageDeliveryStatus status, final P producer) {
51          if (this.isStopped()) {
52              throw new IllegalStateException("Already stopped.");
53          }
54          for (final AbstractExpectation<C, P> exchange : this.exchanges) {
55              exchange.messageReceived(message, status, producer);
56          }
57      }
58  
59      /**
60       * The resulting future will only return after such a message is received
61       * that makes the condition true.
62       *
63       * @param condition
64       *            Condition to be true.
65       * @return The future.
66       */
67      public synchronized Future<Message> setExpectation(final C condition) {
68          return this.setExpectation(condition, null);
69      }
70  
71      /**
72       * The resulting future will only return after such a message is received
73       * that makes the condition true.
74       *
75       * @param condition
76       *            Condition to be true.
77       * @return The future.
78       */
79      public synchronized Future<Message> setExpectation(final C condition, final MessageAction<P> action) {
80          if (this.isStopped()) {
81              throw new IllegalStateException("Already stopped.");
82          }
83          final AbstractExpectation<C, P> exchange = this.createExpectation(condition, action);
84          this.exchanges.add(exchange);
85          return AbstractExpectationManager.EXECUTOR.submit(exchange);
86      }
87  
88      @Override
89      public boolean stop() {
90          if (this.isStopped()) {
91              return false;
92          }
93          this.exchanges.clear();
94          this.isStopped = true;
95          return true;
96      }
97  
98      /**
99       * Stop tracking this expectation. Calls from the internal code only.
100      *
101      * @param expectation
102      *            The expectation to stop.
103      * @return If stopped, false if stopped already.
104      */
105     protected synchronized boolean unsetExpectation(final AbstractExpectation<C, P> expectation) {
106         return this.exchanges.remove(expectation);
107     }
108 
109 }