001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertThrows; 022import static org.junit.Assert.assertTrue; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.testclassification.RegionServerTests; 027import org.apache.hadoop.hbase.testclassification.SmallTests; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; 030import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; 031import org.junit.ClassRule; 032import org.junit.Test; 033import org.junit.experimental.categories.Category; 034 035import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; 037 038@Category({ RegionServerTests.class, SmallTests.class }) 039public class TestDefaultOperationQuota { 040 @ClassRule 041 public static final HBaseClassTestRule CLASS_RULE = 042 HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); 043 044 private static final int DEFAULT_REQUESTS_PER_SECOND = 1000; 045 private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); 046 static { 047 envEdge.setValue(EnvironmentEdgeManager.currentTime()); 048 // only active the envEdge for quotas package 049 EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, 050 ThrottleQuotaTestUtil.class.getPackage().getName()); 051 } 052 053 @Test 054 public void testScanEstimateNewScanner() { 055 long blockSize = 64 * 1024; 056 long nextCallSeq = 0; 057 long maxScannerResultSize = 100 * 1024 * 1024; 058 long maxBlockBytesScanned = 0; 059 long prevBBSDifference = 0; 060 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 061 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 062 063 // new scanner should estimate scan read as 1 block 064 assertEquals(blockSize, estimate); 065 } 066 067 @Test 068 public void testScanEstimateSecondNextCall() { 069 long blockSize = 64 * 1024; 070 long nextCallSeq = 1; 071 long maxScannerResultSize = 100 * 1024 * 1024; 072 long maxBlockBytesScanned = 10 * blockSize; 073 long prevBBSDifference = 10 * blockSize; 074 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 075 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 076 077 // 2nd next call should be estimated at maxBBS 078 assertEquals(maxBlockBytesScanned, estimate); 079 } 080 081 @Test 082 public void testScanEstimateFlatWorkload() { 083 long blockSize = 64 * 1024; 084 long nextCallSeq = 100; 085 long maxScannerResultSize = 100 * 1024 * 1024; 086 long maxBlockBytesScanned = 10 * blockSize; 087 long prevBBSDifference = 0; 088 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 089 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 090 091 // flat workload should not overestimate 092 assertEquals(maxBlockBytesScanned, estimate); 093 } 094 095 @Test 096 public void testScanEstimateVariableFlatWorkload() { 097 long blockSize = 64 * 1024; 098 long nextCallSeq = 1; 099 long maxScannerResultSize = 100 * 1024 * 1024; 100 long maxBlockBytesScanned = 10 * blockSize; 101 long prevBBSDifference = 0; 102 for (int i = 0; i < 100; i++) { 103 long variation = Math.round(Math.random() * blockSize); 104 if (variation % 2 == 0) { 105 variation *= -1; 106 } 107 // despite +/- <1 block variation, we consider this workload flat 108 prevBBSDifference = variation; 109 110 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i, 111 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 112 113 // flat workload should not overestimate 114 assertEquals(maxBlockBytesScanned, estimate); 115 } 116 } 117 118 @Test 119 public void testScanEstimateGrowingWorkload() { 120 long blockSize = 64 * 1024; 121 long nextCallSeq = 100; 122 long maxScannerResultSize = 100 * 1024 * 1024; 123 long maxBlockBytesScanned = 20 * blockSize; 124 long prevBBSDifference = 10 * blockSize; 125 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 126 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 127 128 // growing workload should overestimate 129 assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate); 130 } 131 132 @Test 133 public void testScanEstimateShrinkingWorkload() { 134 long blockSize = 64 * 1024; 135 long nextCallSeq = 100; 136 long maxScannerResultSize = 100 * 1024 * 1024; 137 long maxBlockBytesScanned = 20 * blockSize; 138 long prevBBSDifference = -10 * blockSize; 139 long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, 140 maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); 141 142 // shrinking workload should only shrink estimate to maxBBS 143 assertEquals(maxBlockBytesScanned, estimate); 144 } 145 146 @Test 147 public void testLargeBatchSaturatesReadNumLimit() 148 throws RpcThrottlingException, InterruptedException { 149 int limit = 10; 150 QuotaProtos.Throttle throttle = 151 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 152 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 153 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 154 DefaultOperationQuota quota = 155 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 156 157 // use the whole limit 158 quota.checkBatchQuota(0, limit, false); 159 160 // the next request should be rejected 161 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 162 163 envEdge.incValue(1000); 164 // after the TimeUnit, the limit should be refilled 165 quota.checkBatchQuota(0, limit, false); 166 } 167 168 @Test 169 public void testLargeBatchSaturatesReadWriteLimit() 170 throws RpcThrottlingException, InterruptedException { 171 int limit = 10; 172 QuotaProtos.Throttle throttle = 173 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 174 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 175 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 176 DefaultOperationQuota quota = 177 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 178 179 // use the whole limit 180 quota.checkBatchQuota(limit, 0, false); 181 182 // the next request should be rejected 183 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 184 185 envEdge.incValue(1000); 186 // after the TimeUnit, the limit should be refilled 187 quota.checkBatchQuota(limit, 0, false); 188 } 189 190 @Test 191 public void testTooLargeReadBatchIsNotBlocked() 192 throws RpcThrottlingException, InterruptedException { 193 int limit = 10; 194 QuotaProtos.Throttle throttle = 195 QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() 196 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 197 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 198 DefaultOperationQuota quota = 199 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 200 201 // use more than the limit, which should succeed rather than being indefinitely blocked 202 quota.checkBatchQuota(0, 10 + limit, false); 203 204 // the next request should be blocked 205 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 206 207 envEdge.incValue(1000); 208 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 209 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false)); 210 } 211 212 @Test 213 public void testTooLargeWriteBatchIsNotBlocked() 214 throws RpcThrottlingException, InterruptedException { 215 int limit = 10; 216 QuotaProtos.Throttle throttle = 217 QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() 218 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 219 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 220 DefaultOperationQuota quota = 221 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 222 223 // use more than the limit, which should succeed rather than being indefinitely blocked 224 quota.checkBatchQuota(10 + limit, 0, false); 225 226 // the next request should be blocked 227 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 228 229 envEdge.incValue(1000); 230 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 231 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); 232 } 233 234 @Test 235 public void testTooLargeWriteSizeIsNotBlocked() 236 throws RpcThrottlingException, InterruptedException { 237 int limit = 50; 238 QuotaProtos.Throttle throttle = 239 QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() 240 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 241 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 242 DefaultOperationQuota quota = 243 new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); 244 245 // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked 246 quota.checkBatchQuota(1, 0, false); 247 248 // the next request should be blocked 249 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); 250 251 envEdge.incValue(1000); 252 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 253 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); 254 } 255 256 @Test 257 public void testTooLargeReadSizeIsNotBlocked() 258 throws RpcThrottlingException, InterruptedException { 259 long blockSize = 65536; 260 long limit = blockSize / 2; 261 QuotaProtos.Throttle throttle = 262 QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() 263 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 264 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 265 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, 266 DEFAULT_REQUESTS_PER_SECOND, limiter); 267 268 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 269 quota.checkBatchQuota(0, 1, false); 270 271 // the next request should be blocked 272 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 273 274 envEdge.incValue(1000); 275 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 276 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); 277 } 278 279 @Test 280 public void testTooLargeRequestSizeIsNotBlocked() 281 throws RpcThrottlingException, InterruptedException { 282 long blockSize = 65536; 283 long limit = blockSize / 2; 284 QuotaProtos.Throttle throttle = 285 QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() 286 .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); 287 QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); 288 DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, 289 DEFAULT_REQUESTS_PER_SECOND, limiter); 290 291 // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked 292 quota.checkBatchQuota(0, 1, false); 293 294 // the next request should be blocked 295 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); 296 297 envEdge.incValue(1000); 298 // even after the TimeUnit, the limit should not be refilled because we oversubscribed 299 assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); 300 } 301}