View Javadoc
1   package com.github.triceo.splitlog;
2   
3   import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
4   import it.unimi.dsi.fastutil.ints.IntSortedSet;
5   import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
6   import it.unimi.dsi.fastutil.objects.Object2IntMap;
7   import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
8   
9   import java.util.Collections;
10  import java.util.List;
11  import java.util.Map;
12  import java.util.Set;
13  import java.util.WeakHashMap;
14  
15  import org.slf4j.Logger;
16  
17  import com.github.triceo.splitlog.api.Follower;
18  import com.github.triceo.splitlog.api.LogWatch;
19  import com.github.triceo.splitlog.api.LogWatchBuilder;
20  import com.github.triceo.splitlog.api.Message;
21  import com.github.triceo.splitlog.api.SimpleMessageCondition;
22  import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
23  
24  final class LogWatchStorageManager {
25  
26      private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(LogWatchStorageManager.class);
27      private final SimpleMessageCondition acceptanceCondition;
28      private final LogWatch logWatch;
29      private final MessageStore messages;
30      private final Object2IntMap<Follower> runningFollowerStartMarks = new Object2IntLinkedOpenHashMap<Follower>();
31      private final LogWatchStorageSweeper sweeping;
32      /**
33       * This map is weak; when a follower stops being used by user code, we do
34       * not want this map to prevent it from being GC'd. Yet, for as long as the
35       * follower is being used, we want to keep the IDs since the follower may
36       * still ask for the messages.
37       */
38      private final Map<Follower, int[]> terminatedFollowerRanges = new WeakHashMap<Follower, int[]>();
39  
40      public LogWatchStorageManager(final LogWatch watch, final LogWatchBuilder builder) {
41          this.logWatch = watch;
42          this.messages = new MessageStore(builder.getCapacityLimit());
43          this.acceptanceCondition = builder.getStorageCondition();
44          this.sweeping = new LogWatchStorageSweeper(this, builder);
45      }
46  
47      public synchronized boolean followerStarted(final Follower follower) {
48          if (this.isFollowerActive(follower) || this.isFollowerTerminated(follower)) {
49              return false;
50          }
51          final int startingMessageId = this.messages.getNextPosition();
52          LogWatchStorageManager.LOGGER.info("First message position is {} for {}.", startingMessageId, follower);
53          this.runningFollowerStartMarks.put(follower, startingMessageId);
54          if (this.runningFollowerStartMarks.size() == 1) {
55              LogWatchStorageManager.LOGGER.info("New follower registered. Messages can be received.");
56          }
57          return true;
58      }
59  
60      public synchronized boolean followerTerminated(final Follower follower) {
61          if (!this.isFollowerActive(follower)) {
62              return false;
63          }
64          final int startingMessageId = this.runningFollowerStartMarks.removeInt(follower);
65          final int endingMessageId = this.messages.getLatestPosition();
66          LogWatchStorageManager.LOGGER.info("Last message position is {} for {}.", endingMessageId, follower);
67          this.terminatedFollowerRanges.put(follower, new int[]{startingMessageId, endingMessageId});
68          if (this.runningFollowerStartMarks.size() == 0) {
69              LogWatchStorageManager.LOGGER.info("Last remaining follower terminated. No messages can be received.");
70          }
71          return true;
72      }
73  
74      /**
75       * Return all messages that have been sent to the follower, from its start
76       * until either its termination or to this moment, whichever is relevant.
77       *
78       * This method is synchronized so that the modification of the underlying
79       * message store in {@link #addLine(String)} and the reading of this store
80       * is mutually excluded. Otherwise, there is a possibility of message ID
81       * mess in the discarding case.
82       *
83       * @param follower
84       *            The follower in question.
85       * @return Unmodifiable list of all the received messages, in the order
86       *         received.
87       */
88      protected synchronized List<Message> getAllMessages(final Follower follower) {
89          final int end = this.getEndingMessageId(follower);
90          /*
91           * If messages have been discarded, the original starting message ID
92           * will no longer be valid. Therefore, we check for the actual starting
93           * ID.
94           */
95          final int start = Math.max(this.messages.getFirstPosition(), this.getStartingMessageId(follower));
96          if (start > end) {
97              /*
98               * in case some messages have been discarded, the actual start may
99               * get ahead of the expected end. this would have caused an
100              * exception within the message store, and so we handle it here and
101              * return an empty list. this is exactly correct, as if the end is
102              * before the first message in the store, there really is nothing to
103              * return.
104              */
105             return Collections.unmodifiableList(Collections.<Message> emptyList());
106         } else {
107             return Collections.unmodifiableList(this.messages.getFromRange(start, end + 1));
108         }
109     }
110 
111     /**
112      * Get index of the last plus one message that the follower has access to.
113      *
114      * @param follower
115      *            Follower in question.
116      * @return Ending message ID after {@link #followerTerminated(Follower)}.
117      *         {@link MessageStore#getLatestPosition()} between
118      *         {@link #followerStarted(Follower)} and
119      *         {@link #followerTerminated(Follower)}. Will throw an exception
120      *         otherwise.
121      */
122     private synchronized int getEndingMessageId(final Follower follower) {
123         if (this.isFollowerActive(follower)) {
124             return this.messages.getLatestPosition();
125         } else if (this.isFollowerTerminated(follower)) {
126             return this.terminatedFollowerRanges.get(follower)[1];
127         } else {
128             throw new IllegalStateException("Follower never before seen.");
129         }
130     }
131 
132     /**
133      * Will crawl the weak hash maps and make sure we always have the latest
134      * information on the availability of messages.
135      *
136      * This method is only intended to be used from within
137      * {@link LogWatchStorageSweeper}.
138      *
139      * @return ID of the very first message that is reachable by any follower in
140      *         this logWatch. -1 when there are no reachable messages.
141      */
142     protected synchronized int getFirstReachableMessageId() {
143         final boolean followersRunning = !this.runningFollowerStartMarks.isEmpty();
144         if (!followersRunning && this.terminatedFollowerRanges.isEmpty()) {
145             // no followers present; no reachable messages
146             return -1;
147         }
148         final IntSortedSet set = new IntAVLTreeSet(this.runningFollowerStartMarks.values());
149         if (!set.isEmpty()) {
150             final int first = this.messages.getFirstPosition();
151             if (set.firstInt() <= first) {
152                 /*
153                  * cannot go below first position; any other calculation
154                  * unnecessary
155                  */
156                 return first;
157             }
158         }
159         for (final int[] pair : this.terminatedFollowerRanges.values()) {
160             set.add(pair[0]);
161         }
162         return set.firstInt();
163     }
164 
165     public LogWatch getLogWatch() {
166         return this.logWatch;
167     }
168 
169     /**
170      * Get access to the underlying message store.
171      *
172      * This method is only intended to be used from within
173      * {@link LogWatchStorageSweeper}.
174      *
175      * @return Message store used by this class.
176      */
177     protected MessageStore getMessageStore() {
178         return this.messages;
179     }
180 
181     /**
182      * For a given follower, return the starting mark.
183      *
184      * @param follower
185      *            Tailer in question.
186      * @return Starting message ID, if after {@link #followerStarted(Follower)}.
187      *         Will throw an exception otherwise.
188      */
189     private synchronized int getStartingMessageId(final Follower follower) {
190         if (this.isFollowerActive(follower)) {
191             return this.runningFollowerStartMarks.getInt(follower);
192         } else if (this.isFollowerTerminated(follower)) {
193             return this.terminatedFollowerRanges.get(follower)[0];
194         } else {
195             throw new IllegalStateException("Follower never before seen.");
196         }
197     }
198 
199     /**
200      * Whether {@link #followerStarted(Follower)} has been called and
201      * {@link #followerTerminated(Follower)} has not.
202      *
203      * @param follower
204      *            Follower in question.
205      * @return True if inbetween those two calls for this particular follower.
206      */
207     public synchronized boolean isFollowerActive(final Follower follower) {
208         return this.runningFollowerStartMarks.containsKey(follower);
209     }
210 
211     /**
212      * Whether or not {@link #followerTerminated(Follower)} was called on the
213      * follower with a positive result.
214      *
215      * @param follower
216      *            Follower in question.
217      * @return True if called and returned true. May be unpredictable, since it
218      *         will look up references to terminated followers within a weak has
219      *         map.
220      */
221     public synchronized boolean isFollowerTerminated(final Follower follower) {
222         return this.terminatedFollowerRanges.containsKey(follower);
223     }
224 
225     /**
226      * Will mean the end of the storage, including the termination of sweeping.
227      */
228     public synchronized void logWatchTerminated() {
229         final Set<Follower> followersToTerminate = new ObjectLinkedOpenHashSet<Follower>(this.runningFollowerStartMarks.keySet());
230         for (final Follower f : followersToTerminate) {
231             this.followerTerminated(f);
232         }
233         this.sweeping.stop();
234     }
235 
236     public synchronized boolean registerMessage(final Message message, final LogWatch source) {
237         if (source != this.logWatch) {
238             throw new IllegalStateException("Sources don't match.");
239         }
240         final boolean messageAccepted = this.acceptanceCondition.accept(message);
241         if (this.runningFollowerStartMarks.size() == 0) {
242             LogWatchStorageManager.LOGGER.info("Message thrown away as there are no followers: {}.", message);
243         } else if (messageAccepted) {
244             LogWatchStorageManager.LOGGER.info("Message '{}' stored into {}.", message, source);
245             this.messages.add(message);
246             this.sweeping.start();
247         }
248         return messageAccepted;
249     }
250 
251 }