1 package com.github.triceo.splitlog;
2
3 import it.unimi.dsi.fastutil.ints.IntAVLTreeSet;
4 import it.unimi.dsi.fastutil.ints.IntSortedSet;
5 import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
6 import it.unimi.dsi.fastutil.objects.Object2IntMap;
7 import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
8
9 import java.util.Collections;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Set;
13 import java.util.WeakHashMap;
14
15 import org.slf4j.Logger;
16
17 import com.github.triceo.splitlog.api.Follower;
18 import com.github.triceo.splitlog.api.LogWatch;
19 import com.github.triceo.splitlog.api.LogWatchBuilder;
20 import com.github.triceo.splitlog.api.Message;
21 import com.github.triceo.splitlog.api.SimpleMessageCondition;
22 import com.github.triceo.splitlog.logging.SplitlogLoggerFactory;
23
24 final class LogWatchStorageManager {
25
26 private static final Logger LOGGER = SplitlogLoggerFactory.getLogger(LogWatchStorageManager.class);
27 private final SimpleMessageCondition acceptanceCondition;
28 private final LogWatch logWatch;
29 private final MessageStore messages;
30 private final Object2IntMap<Follower> runningFollowerStartMarks = new Object2IntLinkedOpenHashMap<Follower>();
31 private final LogWatchStorageSweeper sweeping;
32
33
34
35
36
37
38 private final Map<Follower, int[]> terminatedFollowerRanges = new WeakHashMap<Follower, int[]>();
39
40 public LogWatchStorageManager(final LogWatch watch, final LogWatchBuilder builder) {
41 this.logWatch = watch;
42 this.messages = new MessageStore(builder.getCapacityLimit());
43 this.acceptanceCondition = builder.getStorageCondition();
44 this.sweeping = new LogWatchStorageSweeper(this, builder);
45 }
46
47 public synchronized boolean followerStarted(final Follower follower) {
48 if (this.isFollowerActive(follower) || this.isFollowerTerminated(follower)) {
49 return false;
50 }
51 final int startingMessageId = this.messages.getNextPosition();
52 LogWatchStorageManager.LOGGER.info("First message position is {} for {}.", startingMessageId, follower);
53 this.runningFollowerStartMarks.put(follower, startingMessageId);
54 if (this.runningFollowerStartMarks.size() == 1) {
55 LogWatchStorageManager.LOGGER.info("New follower registered. Messages can be received.");
56 }
57 return true;
58 }
59
60 public synchronized boolean followerTerminated(final Follower follower) {
61 if (!this.isFollowerActive(follower)) {
62 return false;
63 }
64 final int startingMessageId = this.runningFollowerStartMarks.removeInt(follower);
65 final int endingMessageId = this.messages.getLatestPosition();
66 LogWatchStorageManager.LOGGER.info("Last message position is {} for {}.", endingMessageId, follower);
67 this.terminatedFollowerRanges.put(follower, new int[]{startingMessageId, endingMessageId});
68 if (this.runningFollowerStartMarks.size() == 0) {
69 LogWatchStorageManager.LOGGER.info("Last remaining follower terminated. No messages can be received.");
70 }
71 return true;
72 }
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 protected synchronized List<Message> getAllMessages(final Follower follower) {
89 final int end = this.getEndingMessageId(follower);
90
91
92
93
94
95 final int start = Math.max(this.messages.getFirstPosition(), this.getStartingMessageId(follower));
96 if (start > end) {
97
98
99
100
101
102
103
104
105 return Collections.unmodifiableList(Collections.<Message> emptyList());
106 } else {
107 return Collections.unmodifiableList(this.messages.getFromRange(start, end + 1));
108 }
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122 private synchronized int getEndingMessageId(final Follower follower) {
123 if (this.isFollowerActive(follower)) {
124 return this.messages.getLatestPosition();
125 } else if (this.isFollowerTerminated(follower)) {
126 return this.terminatedFollowerRanges.get(follower)[1];
127 } else {
128 throw new IllegalStateException("Follower never before seen.");
129 }
130 }
131
132
133
134
135
136
137
138
139
140
141
142 protected synchronized int getFirstReachableMessageId() {
143 final boolean followersRunning = !this.runningFollowerStartMarks.isEmpty();
144 if (!followersRunning && this.terminatedFollowerRanges.isEmpty()) {
145
146 return -1;
147 }
148 final IntSortedSet set = new IntAVLTreeSet(this.runningFollowerStartMarks.values());
149 if (!set.isEmpty()) {
150 final int first = this.messages.getFirstPosition();
151 if (set.firstInt() <= first) {
152
153
154
155
156 return first;
157 }
158 }
159 for (final int[] pair : this.terminatedFollowerRanges.values()) {
160 set.add(pair[0]);
161 }
162 return set.firstInt();
163 }
164
165 public LogWatch getLogWatch() {
166 return this.logWatch;
167 }
168
169
170
171
172
173
174
175
176
177 protected MessageStore getMessageStore() {
178 return this.messages;
179 }
180
181
182
183
184
185
186
187
188
189 private synchronized int getStartingMessageId(final Follower follower) {
190 if (this.isFollowerActive(follower)) {
191 return this.runningFollowerStartMarks.getInt(follower);
192 } else if (this.isFollowerTerminated(follower)) {
193 return this.terminatedFollowerRanges.get(follower)[0];
194 } else {
195 throw new IllegalStateException("Follower never before seen.");
196 }
197 }
198
199
200
201
202
203
204
205
206
207 public synchronized boolean isFollowerActive(final Follower follower) {
208 return this.runningFollowerStartMarks.containsKey(follower);
209 }
210
211
212
213
214
215
216
217
218
219
220
221 public synchronized boolean isFollowerTerminated(final Follower follower) {
222 return this.terminatedFollowerRanges.containsKey(follower);
223 }
224
225
226
227
228 public synchronized void logWatchTerminated() {
229 final Set<Follower> followersToTerminate = new ObjectLinkedOpenHashSet<Follower>(this.runningFollowerStartMarks.keySet());
230 for (final Follower f : followersToTerminate) {
231 this.followerTerminated(f);
232 }
233 this.sweeping.stop();
234 }
235
236 public synchronized boolean registerMessage(final Message message, final LogWatch source) {
237 if (source != this.logWatch) {
238 throw new IllegalStateException("Sources don't match.");
239 }
240 final boolean messageAccepted = this.acceptanceCondition.accept(message);
241 if (this.runningFollowerStartMarks.size() == 0) {
242 LogWatchStorageManager.LOGGER.info("Message thrown away as there are no followers: {}.", message);
243 } else if (messageAccepted) {
244 LogWatchStorageManager.LOGGER.info("Message '{}' stored into {}.", message, source);
245 this.messages.add(message);
246 this.sweeping.start();
247 }
248 return messageAccepted;
249 }
250
251 }