View Javadoc

1   package com.github.triceo.splitlog;
2   
3   import java.io.OutputStream;
4   import java.util.SortedSet;
5   import java.util.concurrent.Future;
6   import java.util.concurrent.TimeUnit;
7   import java.util.concurrent.atomic.AtomicLong;
8   
9   import com.github.triceo.splitlog.api.CommonFollower;
10  import com.github.triceo.splitlog.api.LogWatch;
11  import com.github.triceo.splitlog.api.Message;
12  import com.github.triceo.splitlog.api.MessageAction;
13  import com.github.triceo.splitlog.api.MessageComparator;
14  import com.github.triceo.splitlog.api.MessageConsumer;
15  import com.github.triceo.splitlog.api.MessageFormatter;
16  import com.github.triceo.splitlog.api.MessageListener;
17  import com.github.triceo.splitlog.api.MessageMeasure;
18  import com.github.triceo.splitlog.api.MessageMetric;
19  import com.github.triceo.splitlog.api.MessageProducer;
20  import com.github.triceo.splitlog.api.MidDeliveryMessageCondition;
21  import com.github.triceo.splitlog.api.SimpleMessageCondition;
22  import com.github.triceo.splitlog.conditions.AllFollowerMessagesAcceptingCondition;
23  import com.github.triceo.splitlog.expectations.MidDeliveryExpectationManager;
24  import com.github.triceo.splitlog.ordering.OriginalOrderingMessageComprator;
25  
26  /**
27   * Internal API for a log follower that, on top of the public API, provides ways
28   * for {@link LogWatch} of notifying the follower of new messages. Every
29   * follower implementation, such as {@link NonStoringFollower}, needs to extend
30   * this class.
31   *
32   * Will use {@link #getDefaultFormatter()} as default message formatter. Will
33   * use {@value #DEFAULT_CONDITION} as a default in getMessages() and write()
34   * methods. Will use {@link #DEFAULT_COMPARATOR} as a default order for the
35   * messages.
36   */
37  abstract class AbstractCommonFollower<P extends MessageProducer<P>, C extends MessageProducer<C>> implements
38  CommonFollower<P, C>, ConsumerRegistrar<P> {
39  
40      private static final MessageComparator DEFAULT_COMPARATOR = OriginalOrderingMessageComprator.INSTANCE;
41      private static final SimpleMessageCondition DEFAULT_CONDITION = AllFollowerMessagesAcceptingCondition.INSTANCE;
42  
43      private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
44  
45      private final MidDeliveryExpectationManager<C> expectations = new MidDeliveryExpectationManager<C>();
46  
47      private final long uniqueId = AbstractCommonFollower.ID_GENERATOR.getAndIncrement();
48  
49      @Override
50      public int countConsumers() {
51          return this.getConsumerManager().countConsumers();
52      }
53  
54      @Override
55      public int countMetrics() {
56          return this.getConsumerManager().countMetrics();
57      }
58  
59      @Override
60      public Future<Message> expect(final MidDeliveryMessageCondition<C> condition) {
61          return this.expectations.setExpectation(condition);
62      }
63  
64      @Override
65      public Future<Message> expect(final MidDeliveryMessageCondition<C> condition, final MessageAction<C> action) {
66          return this.expectations.setExpectation(condition, action);
67      }
68  
69      protected abstract ConsumerManager<P> getConsumerManager();
70  
71      /**
72       * Provide the default formatter for messages in this follower.
73       *
74       * @return Formatter to use on messages.
75       */
76      protected abstract MessageFormatter getDefaultFormatter();
77  
78      protected MidDeliveryExpectationManager<C> getExpectationManager() {
79          return this.expectations;
80      }
81  
82      @Override
83      public SortedSet<Message> getMessages() {
84          return this.getMessages(AbstractCommonFollower.DEFAULT_COMPARATOR);
85      }
86  
87      @Override
88      public SortedSet<Message> getMessages(final MessageComparator order) {
89          return this.getMessages(AbstractCommonFollower.DEFAULT_CONDITION, order);
90      }
91  
92      @Override
93      public SortedSet<Message> getMessages(final SimpleMessageCondition condition) {
94          return this.getMessages(condition, AbstractCommonFollower.DEFAULT_COMPARATOR);
95      }
96  
97      @Override
98      public MessageMetric<? extends Number, P> getMetric(final String id) {
99          return this.getConsumerManager().getMetric(id);
100     }
101 
102     @Override
103     public String getMetricId(final MessageMetric<? extends Number, P> measure) {
104         return this.getConsumerManager().getMetricId(measure);
105     }
106 
107     public long getUniqueId() {
108         return this.uniqueId;
109     }
110 
111     @Override
112     public boolean isConsuming(final MessageConsumer<P> consumer) {
113         return this.getConsumerManager().isConsuming(consumer);
114     }
115 
116     @Override
117     public boolean isMeasuring(final MessageMetric<? extends Number, P> metric) {
118         return this.getConsumerManager().isMeasuring(metric);
119     }
120 
121     @Override
122     public boolean isMeasuring(final String id) {
123         return this.getConsumerManager().isMeasuring(id);
124     }
125 
126     @Override
127     public boolean registerConsumer(final MessageConsumer<P> consumer) {
128         return this.getConsumerManager().registerConsumer(consumer);
129     }
130 
131     @Override
132     public MessageConsumer<P> startConsuming(final MessageListener<P> listener) {
133         return this.getConsumerManager().startConsuming(listener);
134     }
135 
136     @Override
137     public <T extends Number> MessageMetric<T, P> startMeasuring(final MessageMeasure<T, P> measure, final String id) {
138         return this.startMeasuring(measure, id, true);
139     }
140 
141     protected synchronized <T extends Number> MessageMetric<T, P> startMeasuring(final MessageMeasure<T, P> measure,
142             final String id, final boolean checkIfFollowing) {
143         if (checkIfFollowing && this.isStopped()) {
144             throw new IllegalStateException("Cannot start measurement as the follower is no longer active.");
145         }
146         return this.getConsumerManager().startMeasuring(measure, id);
147     }
148 
149     @Override
150     public boolean stopConsuming(final MessageConsumer<P> consumer) {
151         return this.getConsumerManager().stopConsuming(consumer);
152     }
153 
154     @Override
155     public boolean stopMeasuring(final MessageMetric<? extends Number, P> metric) {
156         return this.getConsumerManager().stopMeasuring(metric);
157     }
158 
159     @Override
160     public boolean stopMeasuring(final String id) {
161         return this.getConsumerManager().stopMeasuring(id);
162     }
163 
164     @Override
165     public Message waitFor(final MidDeliveryMessageCondition<C> condition) {
166         try {
167             return this.expect(condition).get();
168         } catch (final Exception e) {
169             return null;
170         }
171     }
172 
173     @Override
174     public Message waitFor(final MidDeliveryMessageCondition<C> condition, final long timeout, final TimeUnit unit) {
175         try {
176             return this.expect(condition).get(timeout, unit);
177         } catch (final Exception e) {
178             return null;
179         }
180     }
181 
182     @Override
183     public boolean write(final OutputStream stream) {
184         return this.write(stream, this.getDefaultFormatter());
185     }
186 
187     @Override
188     public boolean write(final OutputStream stream, final MessageComparator order) {
189         return this.write(stream, order, this.getDefaultFormatter());
190     }
191 
192     @Override
193     public boolean write(final OutputStream stream, final MessageComparator order, final MessageFormatter formatter) {
194         return this.write(stream, AbstractCommonFollower.DEFAULT_CONDITION, order, formatter);
195     }
196 
197     @Override
198     public boolean write(final OutputStream stream, final MessageFormatter formatter) {
199         return this.write(stream, AbstractCommonFollower.DEFAULT_CONDITION, formatter);
200     }
201 
202     @Override
203     public boolean write(final OutputStream stream, final SimpleMessageCondition condition) {
204         return this.write(stream, condition, this.getDefaultFormatter());
205     }
206 
207     @Override
208     public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
209         final MessageComparator order) {
210         return this.write(stream, condition, order, this.getDefaultFormatter());
211     }
212 
213     @Override
214     public boolean write(final OutputStream stream, final SimpleMessageCondition condition,
215         final MessageFormatter formatter) {
216         return this.write(stream, condition, AbstractCommonFollower.DEFAULT_COMPARATOR, formatter);
217     }
218 
219 }