001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.quotas; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Optional; 023import java.util.concurrent.TimeUnit; 024import java.util.function.Supplier; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.TableDescriptor; 027import org.apache.hadoop.hbase.ipc.RpcScheduler; 028import org.apache.hadoop.hbase.ipc.RpcServer; 029import org.apache.hadoop.hbase.regionserver.Region; 030import org.apache.hadoop.hbase.regionserver.RegionServerServices; 031import org.apache.hadoop.hbase.security.User; 032import org.apache.hadoop.security.UserGroupInformation; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.apache.yetus.audience.InterfaceStability; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 041 042/** 043 * Region Server Quota Manager. It is responsible to provide access to the quota information of each 044 * user/table. The direct user of this class is the RegionServer that will get and check the 045 * user/table quota for each operation (put, get, scan). For system tables and user/table with a 046 * quota specified, the quota check will be a noop. 047 */ 048@InterfaceAudience.Private 049@InterfaceStability.Evolving 050public class RegionServerRpcQuotaManager implements RpcQuotaManager { 051 private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); 052 053 private final RegionServerServices rsServices; 054 055 private QuotaCache quotaCache = null; 056 private volatile boolean rpcThrottleEnabled; 057 // Storage for quota rpc throttle 058 private RpcThrottleStorage rpcThrottleStorage; 059 private final Supplier<Double> requestsPerSecondSupplier; 060 061 public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { 062 this.rsServices = rsServices; 063 rpcThrottleStorage = 064 new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration()); 065 this.requestsPerSecondSupplier = Suppliers.memoizeWithExpiration( 066 () -> rsServices.getMetrics().getRegionServerWrapper().getRequestsPerSecond(), 1, 067 TimeUnit.MINUTES); 068 } 069 070 public void start(final RpcScheduler rpcScheduler) throws IOException { 071 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { 072 LOG.info("Quota support disabled"); 073 return; 074 } 075 076 LOG.info("Initializing RPC quota support"); 077 078 // Initialize quota cache 079 quotaCache = new QuotaCache(rsServices); 080 quotaCache.start(); 081 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 082 LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled); 083 } 084 085 public void stop() { 086 if (isQuotaEnabled()) { 087 quotaCache.stop("shutdown"); 088 } 089 } 090 091 protected boolean isRpcThrottleEnabled() { 092 return rpcThrottleEnabled; 093 } 094 095 private boolean isQuotaEnabled() { 096 return quotaCache != null; 097 } 098 099 public void switchRpcThrottle(boolean enable) throws IOException { 100 if (isQuotaEnabled()) { 101 if (rpcThrottleEnabled != enable) { 102 boolean previousEnabled = rpcThrottleEnabled; 103 rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled(); 104 LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled); 105 } else { 106 LOG.warn( 107 "Skip switch rpc throttle because previous value {} is the same as current value {}", 108 rpcThrottleEnabled, enable); 109 } 110 } else { 111 LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable); 112 } 113 } 114 115 QuotaCache getQuotaCache() { 116 return quotaCache; 117 } 118 119 /** 120 * Returns the quota for an operation. 121 * @param ugi the user that is executing the operation 122 * @param table the table where the operation will be executed 123 * @return the OperationQuota 124 */ 125 public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table, 126 final int blockSizeBytes) { 127 if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { 128 UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); 129 QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); 130 131 boolean useNoop = userLimiter.isBypass(); 132 if (userQuotaState.hasBypassGlobals()) { 133 if (LOG.isTraceEnabled()) { 134 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); 135 } 136 if (!useNoop) { 137 return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 138 requestsPerSecondSupplier.get(), userLimiter); 139 } 140 } else { 141 QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); 142 QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table); 143 QuotaLimiter rsLimiter = 144 quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY); 145 useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass(); 146 boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled(); 147 if (LOG.isTraceEnabled()) { 148 LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter 149 + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter=" 150 + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled); 151 } 152 if (!useNoop) { 153 if (exceedThrottleQuotaEnabled) { 154 return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 155 requestsPerSecondSupplier.get(), rsLimiter, userLimiter, tableLimiter, nsLimiter); 156 } else { 157 return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, 158 requestsPerSecondSupplier.get(), userLimiter, tableLimiter, nsLimiter, rsLimiter); 159 } 160 } 161 } 162 } 163 return NoopOperationQuota.get(); 164 } 165 166 @Override 167 public OperationQuota checkScanQuota(final Region region, 168 final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, 169 long maxBlockBytesScanned, long prevBlockBytesScannedDifference) 170 throws IOException, RpcThrottlingException { 171 Optional<User> user = RpcServer.getRequestUser(); 172 UserGroupInformation ugi; 173 if (user.isPresent()) { 174 ugi = user.get().getUGI(); 175 } else { 176 ugi = User.getCurrent().getUGI(); 177 } 178 TableDescriptor tableDescriptor = region.getTableDescriptor(); 179 TableName table = tableDescriptor.getTableName(); 180 181 OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); 182 try { 183 quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, 184 prevBlockBytesScannedDifference); 185 } catch (RpcThrottlingException e) { 186 LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan=" 187 + scanRequest.getScannerId() + ": " + e.getMessage()); 188 throw e; 189 } 190 return quota; 191 } 192 193 @Override 194 public OperationQuota checkBatchQuota(final Region region, 195 final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { 196 switch (type) { 197 case GET: 198 return this.checkBatchQuota(region, 0, 1, false); 199 case MUTATE: 200 return this.checkBatchQuota(region, 1, 0, false); 201 case CHECK_AND_MUTATE: 202 return this.checkBatchQuota(region, 1, 1, true); 203 } 204 throw new RuntimeException("Invalid operation type: " + type); 205 } 206 207 @Override 208 public OperationQuota checkBatchQuota(final Region region, 209 final List<ClientProtos.Action> actions, boolean hasCondition) 210 throws IOException, RpcThrottlingException { 211 int numWrites = 0; 212 int numReads = 0; 213 boolean isAtomic = false; 214 for (final ClientProtos.Action action : actions) { 215 if (action.hasMutation()) { 216 numWrites++; 217 OperationQuota.OperationType operationType = 218 QuotaUtil.getQuotaOperationType(action, hasCondition); 219 if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) { 220 numReads++; 221 // If any mutations in this batch are atomic, we will count the entire batch as atomic. 222 // This is a conservative approach, but it is the best that we can do without knowing 223 // the block bytes scanned of each individual action. 224 isAtomic = true; 225 } 226 } else if (action.hasGet()) { 227 numReads++; 228 } 229 } 230 return checkBatchQuota(region, numWrites, numReads, isAtomic); 231 } 232 233 /** 234 * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the 235 * available quota and to report the data/usage of the operation. 236 * @param region the region where the operation will be performed 237 * @param numWrites number of writes to perform 238 * @param numReads number of short-reads to perform 239 * @return the OperationQuota 240 * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. 241 */ 242 @Override 243 public OperationQuota checkBatchQuota(final Region region, final int numWrites, 244 final int numReads, boolean isAtomic) throws IOException, RpcThrottlingException { 245 Optional<User> user = RpcServer.getRequestUser(); 246 UserGroupInformation ugi; 247 if (user.isPresent()) { 248 ugi = user.get().getUGI(); 249 } else { 250 ugi = User.getCurrent().getUGI(); 251 } 252 TableDescriptor tableDescriptor = region.getTableDescriptor(); 253 TableName table = tableDescriptor.getTableName(); 254 255 OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); 256 try { 257 quota.checkBatchQuota(numWrites, numReads, isAtomic); 258 } catch (RpcThrottlingException e) { 259 LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table 260 + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); 261 throw e; 262 } 263 return quota; 264 } 265}