1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.DelayQueue;
25 import java.util.concurrent.Delayed;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29
30
31
32
33
34 @InterfaceAudience.Private
35 public class ConstantDelayQueue<E> implements BlockingQueue<E> {
36
37 private static final class DelayedElement<T> implements Delayed {
38 T element;
39 long end;
40 public DelayedElement(T element, long delayMs) {
41 this.element = element;
42 this.end = EnvironmentEdgeManager.currentTime() + delayMs;
43 }
44
45 @Override
46 public int compareTo(Delayed o) {
47 long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
48 return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1);
49 }
50
51 @Override
52 public long getDelay(TimeUnit unit) {
53 return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
54 }
55 }
56
57 private final long delayMs;
58
59
60 private DelayQueue<DelayedElement<E>> queue = new DelayQueue<DelayedElement<E>>();
61
62 public ConstantDelayQueue(TimeUnit timeUnit, long delay) {
63 this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
64 }
65
66 @Override
67 public E remove() {
68 DelayedElement<E> el = queue.remove();
69 return el == null ? null : el.element;
70 }
71
72 @Override
73 public E poll() {
74 DelayedElement<E> el = queue.poll();
75 return el == null ? null : el.element;
76 }
77
78 @Override
79 public E element() {
80 DelayedElement<E> el = queue.element();
81 return el == null ? null : el.element;
82 }
83
84 @Override
85 public E peek() {
86 DelayedElement<E> el = queue.peek();
87 return el == null ? null : el.element;
88 }
89
90 @Override
91 public int size() {
92 return queue.size();
93 }
94
95 @Override
96 public boolean isEmpty() {
97 return queue.isEmpty();
98 }
99
100 @Override
101 public Iterator<E> iterator() {
102 throw new UnsupportedOperationException();
103 }
104
105 @Override
106 public Object[] toArray() {
107 throw new UnsupportedOperationException();
108 }
109
110 @Override
111 public <T> T[] toArray(T[] a) {
112 throw new UnsupportedOperationException();
113 }
114
115 @Override
116 public boolean containsAll(Collection<?> c) {
117 throw new UnsupportedOperationException();
118 }
119
120 @Override
121 public boolean addAll(Collection<? extends E> c) {
122 throw new UnsupportedOperationException();
123 }
124
125 @Override
126 public boolean removeAll(Collection<?> c) {
127 throw new UnsupportedOperationException();
128 }
129
130 @Override
131 public boolean retainAll(Collection<?> c) {
132 throw new UnsupportedOperationException();
133 }
134
135 @Override
136 public void clear() {
137 queue.clear();
138 }
139
140 @Override
141 public boolean add(E e) {
142 return queue.add(new DelayedElement<E>(e, delayMs));
143 }
144
145 @Override
146 public boolean offer(E e) {
147 return queue.offer(new DelayedElement<E>(e, delayMs));
148 }
149
150 @Override
151 public void put(E e) throws InterruptedException {
152 queue.put(new DelayedElement<E>(e, delayMs));
153 }
154
155 @Override
156 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
157 return queue.offer(new DelayedElement<E>(e, delayMs), timeout, unit);
158 }
159
160 @Override
161 public E take() throws InterruptedException {
162 DelayedElement<E> el = queue.take();
163 return el == null ? null : el.element;
164 }
165
166 @Override
167 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
168 DelayedElement<E> el = queue.poll(timeout, unit);
169 return el == null ? null : el.element;
170 }
171
172 @Override
173 public int remainingCapacity() {
174 return queue.remainingCapacity();
175 }
176
177 @Override
178 public boolean remove(Object o) {
179 throw new UnsupportedOperationException();
180 }
181
182 @Override
183 public boolean contains(Object o) {
184 throw new UnsupportedOperationException();
185 }
186
187 @Override
188 public int drainTo(Collection<? super E> c) {
189 throw new UnsupportedOperationException();
190 }
191
192 @Override
193 public int drainTo(Collection<? super E> c, int maxElements) {
194 throw new UnsupportedOperationException();
195 }
196 }