View Javadoc
1   package com.github.triceo.splitlog;
2   
3   import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
4   import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
5   
6   import java.util.Collections;
7   import java.util.LinkedList;
8   import java.util.List;
9   import java.util.SortedMap;
10  import java.util.concurrent.atomic.AtomicInteger;
11  
12  import org.slf4j.Logger;
13  
14  import com.github.triceo.splitlog.api.LogWatch;
15  import com.github.triceo.splitlog.api.Message;
16  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
17  
18  /**
19   * Data storage for a particular {@link LogWatch}.
20   *
21   * This class is thread-safe.
22   */
23  final class MessageStore {
24  
25      public static final int INITIAL_MESSAGE_POSITION = 0;
26      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(MessageStore.class);
27  
28      private final int messageLimit;
29      private final AtomicInteger nextMessagePosition = new AtomicInteger(MessageStore.INITIAL_MESSAGE_POSITION);
30      private final Int2ObjectSortedMap<Message> store = new Int2ObjectAVLTreeMap<Message>();
31  
32      /**
33       * Create a message store with a maximum capacity of
34       * {@link Integer#MAX_VALUE} messages. Will not actually allocate all that
35       * space, but instead will keep growing as necessary.
36       */
37      public MessageStore() {
38          this(Integer.MAX_VALUE);
39      }
40  
41      /**
42       * Create a message store with a given message capacity. Will not actually
43       * allocate all that space, but instead will keep growing as necessary.
44       */
45      public MessageStore(final int size) {
46          if (size <= 0) {
47              throw new IllegalArgumentException("The message storage cannot have 0 or less capacity.");
48          } else {
49              this.messageLimit = size;
50          }
51      }
52  
53      /**
54       * Add message to the storage.
55       *
56       * Every call will change values returned by {@link #getNextPosition()} and
57       * {@link #getLatestPosition()}. Any call may change value returned by
58       * {@link #getFirstPosition()}, which will happen if a message is discarded
59       * due to hitting the message store capacity.
60       *
61       * @param msg
62       *            Message in question.
63       * @return Position of the message.
64       */
65      public synchronized int add(final Message msg) {
66          final int nextKey = this.getNextPosition();
67          this.store.put(nextKey, msg);
68          MessageStore.LOGGER.info("Message #{} stored on position #{}", msg.getUniqueId(), nextKey);
69          this.nextMessagePosition.incrementAndGet();
70          if (this.store.size() > this.messageLimit) {
71              // discard first message if we're over limit
72              this.store.remove(this.store.firstKey());
73          }
74          return nextKey;
75      }
76  
77      /**
78       * The maximum number of messages that will be held by this store at a time.
79       * When a message is added that pushes the store over the limit, first
80       * inserted message will be removed.
81       *
82       * @return Maximum possible amount of messages this store can hold before it
83       *         starts discarding messages.
84       */
85      public int capacity() {
86          return this.messageLimit;
87      }
88  
89      /**
90       * Remove messages from the queue that come before the given position. If
91       * the ID is larger than {@link #getLatestPosition()}, all messages will be
92       * discarded while marking no future messages for discarding.
93       *
94       * @param firstPositionNotToDiscard
95       *            Messages be kept from this position onward, inclusive.
96       * @return Number of messages actually discarded.
97       */
98      public synchronized int discardBefore(final int firstPositionNotToDiscard) {
99          final int firstMessagePosition = this.getFirstPosition();
100         if (this.getNextPosition() == MessageStore.INITIAL_MESSAGE_POSITION) {
101             MessageStore.LOGGER.info("Not discarding any messages, as there haven't been any messages yet.");
102             return 0;
103         } else if ((firstPositionNotToDiscard < MessageStore.INITIAL_MESSAGE_POSITION)
104                 || (firstPositionNotToDiscard <= firstMessagePosition)) {
105             MessageStore.LOGGER.info(
106                     "Not discarding any messages, as there are no messages with position lower than {}.",
107                     firstPositionNotToDiscard);
108             return 0;
109         } else if (firstPositionNotToDiscard > this.getLatestPosition()) {
110             MessageStore.LOGGER.info("Discarding all messages, as all have lower positions than {}.",
111                     firstPositionNotToDiscard);
112             final int size = this.store.size();
113             this.store.clear();
114             return size;
115         }
116         // and now actually discard
117         MessageStore.LOGGER.info("Discarding messages in positions <{},{}).", firstMessagePosition,
118                 firstPositionNotToDiscard);
119         final SortedMap<Integer, Message> toDiscard = this.store.headMap(firstPositionNotToDiscard);
120         final int size = toDiscard.size();
121         toDiscard.clear();
122         return size;
123     }
124 
125     /**
126      * Return all messages currently present.
127      *
128      * @return Unmodifiable list containing those messages.
129      */
130     public List<Message> getAll() {
131         final int firstMessagePosition = this.getFirstPosition();
132         if (firstMessagePosition < MessageStore.INITIAL_MESSAGE_POSITION) {
133             return Collections.unmodifiableList(Collections.<Message> emptyList());
134         }
135         return this.getFrom(firstMessagePosition);
136     }
137 
138     /**
139      * The first position that is occupied by a message.
140      *
141      * @return -1 if no messages yet. 0 if no messages have been discarded. Add
142      *         one for every discarded message.
143      */
144     public synchronized int getFirstPosition() {
145         if (this.store.isEmpty()) {
146             return MessageStore.INITIAL_MESSAGE_POSITION - 1;
147         } else {
148             return this.store.firstIntKey();
149         }
150     }
151 
152     /**
153      * Return all messages on positions higher or equal to the given.
154      *
155      * @param startPosition
156      *            Least position, inclusive.
157      * @return Unmodifiable list containing those messages.
158      */
159     public List<Message> getFrom(final int startPosition) {
160         return this.getFromRange(startPosition, this.getNextPosition());
161     }
162 
163     /**
164      * Return all messages on positions in the given range.
165      *
166      * @param startPosition
167      *            Least position, inclusive.
168      * @param endPosition
169      *            Greatest position, exclusive.
170      * @return Unmodifiable list containing those messages.
171      */
172     public synchronized List<Message> getFromRange(final int startPosition, final int endPosition) {
173         // cache this here, so all parts of the method operate on the same data
174         final int firstMessageId = this.getFirstPosition();
175         // input validation
176         if ((startPosition < MessageStore.INITIAL_MESSAGE_POSITION)
177                 || (endPosition < MessageStore.INITIAL_MESSAGE_POSITION)) {
178             throw new IllegalArgumentException("Message position cannot be negative.");
179         } else if ((firstMessageId >= MessageStore.INITIAL_MESSAGE_POSITION) && (startPosition < firstMessageId)) {
180             throw new IllegalArgumentException("Message at position " + startPosition
181                     + " had already been discarded. First available message position is " + firstMessageId + ".");
182         } else if (endPosition <= startPosition) {
183             throw new IllegalArgumentException("Ending position must be larger than starting message position.");
184         } else if (endPosition > this.getNextPosition()) {
185             throw new IllegalArgumentException("Range end cannot be greater than the next message position.");
186         }
187         // and properly synchronized range retrieval
188         return Collections.unmodifiableList(new LinkedList<Message>(this.store.subMap(startPosition, endPosition)
189                 .values()));
190     }
191 
192     /**
193      * The latest position that has already been filled with a message.
194      *
195      * @return -1 if no messages yet.
196      */
197     public synchronized int getLatestPosition() {
198         return (this.store.isEmpty()) ? this.nextMessagePosition.get() - 1 : this.store.lastIntKey();
199     }
200 
201     /**
202      * The position that will be occupied by the message that goes through the
203      * very next {@link #add(Message)} call.
204      *
205      * @return 0 if no messages have been inserted yet.
206      */
207     public int getNextPosition() {
208         return this.nextMessagePosition.get();
209     }
210 
211     /**
212      * Whether or not this message store currently holds any messages.
213      *
214      * @return True if so, false if not. Will be false even if there were some
215      *         messages before that got discarded and now there are none.
216      */
217     public synchronized boolean isEmpty() {
218         return this.store.isEmpty();
219     }
220 
221     /**
222      * How many messages are currently stored here.
223      *
224      * It is the number of messages that went through {@link #add(Message)} and
225      * that have not been discarded since, either through
226      * {@link #discardBefore(int)} or automatically due to capacity.
227      *
228      * @return
229      */
230     public synchronized int size() {
231         return this.store.size();
232     }
233 
234 }