View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import java.util.Collection;
22  import java.util.Iterator;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.DelayQueue;
25  import java.util.concurrent.Delayed;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  
30  /**
31   * A blocking queue implementation for adding a constant delay. Uses a DelayQueue as a backing store
32   * @param <E> type of elements
33   */
34  @InterfaceAudience.Private
35  public class ConstantDelayQueue<E> implements BlockingQueue<E> {
36  
37    private static final class DelayedElement<T> implements Delayed {
38      T element;
39      long end;
40      public DelayedElement(T element, long delayMs) {
41        this.element = element;
42        this.end = EnvironmentEdgeManager.currentTime() + delayMs;
43      }
44  
45      @Override
46      public int compareTo(Delayed o) {
47        long cmp = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
48        return cmp == 0 ? 0 : ( cmp < 0 ? -1 : 1);
49      }
50  
51      @Override
52      public long getDelay(TimeUnit unit) {
53        return unit.convert(end - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
54      }
55    }
56  
57    private final long delayMs;
58  
59    // backing DelayQueue
60    private DelayQueue<DelayedElement<E>> queue = new DelayQueue<DelayedElement<E>>();
61  
62    public ConstantDelayQueue(TimeUnit timeUnit, long delay) {
63      this.delayMs = TimeUnit.MILLISECONDS.convert(delay, timeUnit);
64    }
65  
66    @Override
67    public E remove() {
68      DelayedElement<E> el = queue.remove();
69      return el == null ? null : el.element;
70    }
71  
72    @Override
73    public E poll() {
74      DelayedElement<E> el = queue.poll();
75      return el == null ? null : el.element;
76    }
77  
78    @Override
79    public E element() {
80      DelayedElement<E> el = queue.element();
81      return el == null ? null : el.element;
82    }
83  
84    @Override
85    public E peek() {
86      DelayedElement<E> el = queue.peek();
87      return el == null ? null : el.element;
88    }
89  
90    @Override
91    public int size() {
92      return queue.size();
93    }
94  
95    @Override
96    public boolean isEmpty() {
97      return queue.isEmpty();
98    }
99  
100   @Override
101   public Iterator<E> iterator() {
102     throw new UnsupportedOperationException(); // not implemented yet
103   }
104 
105   @Override
106   public Object[] toArray() {
107     throw new UnsupportedOperationException(); // not implemented yet
108   }
109 
110   @Override
111   public <T> T[] toArray(T[] a) {
112     throw new UnsupportedOperationException(); // not implemented yet
113   }
114 
115   @Override
116   public boolean containsAll(Collection<?> c) {
117     throw new UnsupportedOperationException(); // not implemented yet
118   }
119 
120   @Override
121   public boolean addAll(Collection<? extends E> c) {
122     throw new UnsupportedOperationException(); // not implemented yet
123   }
124 
125   @Override
126   public boolean removeAll(Collection<?> c) {
127     throw new UnsupportedOperationException(); // not implemented yet
128   }
129 
130   @Override
131   public boolean retainAll(Collection<?> c) {
132     throw new UnsupportedOperationException(); // not implemented yet
133   }
134 
135   @Override
136   public void clear() {
137     queue.clear();
138   }
139 
140   @Override
141   public boolean add(E e) {
142     return queue.add(new DelayedElement<E>(e, delayMs));
143   }
144 
145   @Override
146   public boolean offer(E e) {
147     return queue.offer(new DelayedElement<E>(e, delayMs));
148   }
149 
150   @Override
151   public void put(E e) throws InterruptedException {
152     queue.put(new DelayedElement<E>(e, delayMs));
153   }
154 
155   @Override
156   public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
157     return queue.offer(new DelayedElement<E>(e, delayMs), timeout, unit);
158   }
159 
160   @Override
161   public E take() throws InterruptedException {
162     DelayedElement<E> el = queue.take();
163     return el == null ? null : el.element;
164   }
165 
166   @Override
167   public E poll(long timeout, TimeUnit unit) throws InterruptedException {
168     DelayedElement<E> el = queue.poll(timeout, unit);
169     return el == null ? null : el.element;
170   }
171 
172   @Override
173   public int remainingCapacity() {
174     return queue.remainingCapacity();
175   }
176 
177   @Override
178   public boolean remove(Object o) {
179     throw new UnsupportedOperationException(); // not implemented yet
180   }
181 
182   @Override
183   public boolean contains(Object o) {
184     throw new UnsupportedOperationException(); // not implemented yet
185   }
186 
187   @Override
188   public int drainTo(Collection<? super E> c) {
189     throw new UnsupportedOperationException(); // not implemented yet
190   }
191 
192   @Override
193   public int drainTo(Collection<? super E> c, int maxElements) {
194     throw new UnsupportedOperationException(); // not implemented yet
195   }
196 }