001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertThrows;
022import static org.junit.Assert.assertTrue;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.testclassification.RegionServerTests;
027import org.apache.hadoop.hbase.testclassification.SmallTests;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
030import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
031import org.junit.ClassRule;
032import org.junit.Test;
033import org.junit.experimental.categories.Category;
034
035import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
037
038@Category({ RegionServerTests.class, SmallTests.class })
039public class TestDefaultOperationQuota {
040  @ClassRule
041  public static final HBaseClassTestRule CLASS_RULE =
042    HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
043
044  private static final int DEFAULT_REQUESTS_PER_SECOND = 1000;
045  private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
046  static {
047    envEdge.setValue(EnvironmentEdgeManager.currentTime());
048    // only active the envEdge for quotas package
049    EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
050      ThrottleQuotaTestUtil.class.getPackage().getName());
051  }
052
053  @Test
054  public void testScanEstimateNewScanner() {
055    long blockSize = 64 * 1024;
056    long nextCallSeq = 0;
057    long maxScannerResultSize = 100 * 1024 * 1024;
058    long maxBlockBytesScanned = 0;
059    long prevBBSDifference = 0;
060    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
061      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
062
063    // new scanner should estimate scan read as 1 block
064    assertEquals(blockSize, estimate);
065  }
066
067  @Test
068  public void testScanEstimateSecondNextCall() {
069    long blockSize = 64 * 1024;
070    long nextCallSeq = 1;
071    long maxScannerResultSize = 100 * 1024 * 1024;
072    long maxBlockBytesScanned = 10 * blockSize;
073    long prevBBSDifference = 10 * blockSize;
074    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
075      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
076
077    // 2nd next call should be estimated at maxBBS
078    assertEquals(maxBlockBytesScanned, estimate);
079  }
080
081  @Test
082  public void testScanEstimateFlatWorkload() {
083    long blockSize = 64 * 1024;
084    long nextCallSeq = 100;
085    long maxScannerResultSize = 100 * 1024 * 1024;
086    long maxBlockBytesScanned = 10 * blockSize;
087    long prevBBSDifference = 0;
088    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
089      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
090
091    // flat workload should not overestimate
092    assertEquals(maxBlockBytesScanned, estimate);
093  }
094
095  @Test
096  public void testScanEstimateVariableFlatWorkload() {
097    long blockSize = 64 * 1024;
098    long nextCallSeq = 1;
099    long maxScannerResultSize = 100 * 1024 * 1024;
100    long maxBlockBytesScanned = 10 * blockSize;
101    long prevBBSDifference = 0;
102    for (int i = 0; i < 100; i++) {
103      long variation = Math.round(Math.random() * blockSize);
104      if (variation % 2 == 0) {
105        variation *= -1;
106      }
107      // despite +/- <1 block variation, we consider this workload flat
108      prevBBSDifference = variation;
109
110      long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i,
111        maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
112
113      // flat workload should not overestimate
114      assertEquals(maxBlockBytesScanned, estimate);
115    }
116  }
117
118  @Test
119  public void testScanEstimateGrowingWorkload() {
120    long blockSize = 64 * 1024;
121    long nextCallSeq = 100;
122    long maxScannerResultSize = 100 * 1024 * 1024;
123    long maxBlockBytesScanned = 20 * blockSize;
124    long prevBBSDifference = 10 * blockSize;
125    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
126      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
127
128    // growing workload should overestimate
129    assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate);
130  }
131
132  @Test
133  public void testScanEstimateShrinkingWorkload() {
134    long blockSize = 64 * 1024;
135    long nextCallSeq = 100;
136    long maxScannerResultSize = 100 * 1024 * 1024;
137    long maxBlockBytesScanned = 20 * blockSize;
138    long prevBBSDifference = -10 * blockSize;
139    long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq,
140      maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference);
141
142    // shrinking workload should only shrink estimate to maxBBS
143    assertEquals(maxBlockBytesScanned, estimate);
144  }
145
146  @Test
147  public void testLargeBatchSaturatesReadNumLimit()
148    throws RpcThrottlingException, InterruptedException {
149    int limit = 10;
150    QuotaProtos.Throttle throttle =
151      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
152        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
153    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
154    DefaultOperationQuota quota =
155      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
156
157    // use the whole limit
158    quota.checkBatchQuota(0, limit, false);
159
160    // the next request should be rejected
161    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
162
163    envEdge.incValue(1000);
164    // after the TimeUnit, the limit should be refilled
165    quota.checkBatchQuota(0, limit, false);
166  }
167
168  @Test
169  public void testLargeBatchSaturatesReadWriteLimit()
170    throws RpcThrottlingException, InterruptedException {
171    int limit = 10;
172    QuotaProtos.Throttle throttle =
173      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
174        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
175    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
176    DefaultOperationQuota quota =
177      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
178
179    // use the whole limit
180    quota.checkBatchQuota(limit, 0, false);
181
182    // the next request should be rejected
183    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
184
185    envEdge.incValue(1000);
186    // after the TimeUnit, the limit should be refilled
187    quota.checkBatchQuota(limit, 0, false);
188  }
189
190  @Test
191  public void testTooLargeReadBatchIsNotBlocked()
192    throws RpcThrottlingException, InterruptedException {
193    int limit = 10;
194    QuotaProtos.Throttle throttle =
195      QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
196        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
197    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
198    DefaultOperationQuota quota =
199      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
200
201    // use more than the limit, which should succeed rather than being indefinitely blocked
202    quota.checkBatchQuota(0, 10 + limit, false);
203
204    // the next request should be blocked
205    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
206
207    envEdge.incValue(1000);
208    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
209    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false));
210  }
211
212  @Test
213  public void testTooLargeWriteBatchIsNotBlocked()
214    throws RpcThrottlingException, InterruptedException {
215    int limit = 10;
216    QuotaProtos.Throttle throttle =
217      QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
218        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
219    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
220    DefaultOperationQuota quota =
221      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
222
223    // use more than the limit, which should succeed rather than being indefinitely blocked
224    quota.checkBatchQuota(10 + limit, 0, false);
225
226    // the next request should be blocked
227    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
228
229    envEdge.incValue(1000);
230    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
231    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false));
232  }
233
234  @Test
235  public void testTooLargeWriteSizeIsNotBlocked()
236    throws RpcThrottlingException, InterruptedException {
237    int limit = 50;
238    QuotaProtos.Throttle throttle =
239      QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
240        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
241    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
242    DefaultOperationQuota quota =
243      new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter);
244
245    // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked
246    quota.checkBatchQuota(1, 0, false);
247
248    // the next request should be blocked
249    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false));
250
251    envEdge.incValue(1000);
252    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
253    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false));
254  }
255
256  @Test
257  public void testTooLargeReadSizeIsNotBlocked()
258    throws RpcThrottlingException, InterruptedException {
259    long blockSize = 65536;
260    long limit = blockSize / 2;
261    QuotaProtos.Throttle throttle =
262      QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
263        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
264    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
265    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize,
266      DEFAULT_REQUESTS_PER_SECOND, limiter);
267
268    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
269    quota.checkBatchQuota(0, 1, false);
270
271    // the next request should be blocked
272    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
273
274    envEdge.incValue(1000);
275    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
276    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false));
277  }
278
279  @Test
280  public void testTooLargeRequestSizeIsNotBlocked()
281    throws RpcThrottlingException, InterruptedException {
282    long blockSize = 65536;
283    long limit = blockSize / 2;
284    QuotaProtos.Throttle throttle =
285      QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
286        .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
287    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
288    DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize,
289      DEFAULT_REQUESTS_PER_SECOND, limiter);
290
291    // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked
292    quota.checkBatchQuota(0, 1, false);
293
294    // the next request should be blocked
295    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false));
296
297    envEdge.incValue(1000);
298    // even after the TimeUnit, the limit should not be refilled because we oversubscribed
299    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false));
300  }
301}