001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Optional;
023import java.util.concurrent.TimeUnit;
024import java.util.function.Supplier;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.TableDescriptor;
027import org.apache.hadoop.hbase.ipc.RpcScheduler;
028import org.apache.hadoop.hbase.ipc.RpcServer;
029import org.apache.hadoop.hbase.regionserver.Region;
030import org.apache.hadoop.hbase.regionserver.RegionServerServices;
031import org.apache.hadoop.hbase.security.User;
032import org.apache.hadoop.security.UserGroupInformation;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.yetus.audience.InterfaceStability;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
041
042/**
043 * Region Server Quota Manager. It is responsible to provide access to the quota information of each
044 * user/table. The direct user of this class is the RegionServer that will get and check the
045 * user/table quota for each operation (put, get, scan). For system tables and user/table with a
046 * quota specified, the quota check will be a noop.
047 */
048@InterfaceAudience.Private
049@InterfaceStability.Evolving
050public class RegionServerRpcQuotaManager implements RpcQuotaManager {
051  private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);
052
053  private final RegionServerServices rsServices;
054
055  private QuotaCache quotaCache = null;
056  private volatile boolean rpcThrottleEnabled;
057  // Storage for quota rpc throttle
058  private RpcThrottleStorage rpcThrottleStorage;
059  private final Supplier<Double> requestsPerSecondSupplier;
060
061  public RegionServerRpcQuotaManager(final RegionServerServices rsServices) {
062    this.rsServices = rsServices;
063    rpcThrottleStorage =
064      new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration());
065    this.requestsPerSecondSupplier = Suppliers.memoizeWithExpiration(
066      () -> rsServices.getMetrics().getRegionServerWrapper().getRequestsPerSecond(), 1,
067      TimeUnit.MINUTES);
068  }
069
070  public void start(final RpcScheduler rpcScheduler) throws IOException {
071    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
072      LOG.info("Quota support disabled");
073      return;
074    }
075
076    LOG.info("Initializing RPC quota support");
077
078    // Initialize quota cache
079    quotaCache = new QuotaCache(rsServices);
080    quotaCache.start();
081    rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
082    LOG.info("Start rpc quota manager and rpc throttle enabled is {}", rpcThrottleEnabled);
083  }
084
085  public void stop() {
086    if (isQuotaEnabled()) {
087      quotaCache.stop("shutdown");
088    }
089  }
090
091  protected boolean isRpcThrottleEnabled() {
092    return rpcThrottleEnabled;
093  }
094
095  private boolean isQuotaEnabled() {
096    return quotaCache != null;
097  }
098
099  public void switchRpcThrottle(boolean enable) throws IOException {
100    if (isQuotaEnabled()) {
101      if (rpcThrottleEnabled != enable) {
102        boolean previousEnabled = rpcThrottleEnabled;
103        rpcThrottleEnabled = rpcThrottleStorage.isRpcThrottleEnabled();
104        LOG.info("Switch rpc throttle from {} to {}", previousEnabled, rpcThrottleEnabled);
105      } else {
106        LOG.warn(
107          "Skip switch rpc throttle because previous value {} is the same as current value {}",
108          rpcThrottleEnabled, enable);
109      }
110    } else {
111      LOG.warn("Skip switch rpc throttle to {} because rpc quota is disabled", enable);
112    }
113  }
114
115  QuotaCache getQuotaCache() {
116    return quotaCache;
117  }
118
119  /**
120   * Returns the quota for an operation.
121   * @param ugi   the user that is executing the operation
122   * @param table the table where the operation will be executed
123   * @return the OperationQuota
124   */
125  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table,
126    final int blockSizeBytes) {
127    if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
128      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
129      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
130
131      boolean useNoop = userLimiter.isBypass();
132      if (userQuotaState.hasBypassGlobals()) {
133        if (LOG.isTraceEnabled()) {
134          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
135        }
136        if (!useNoop) {
137          return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
138            requestsPerSecondSupplier.get(), userLimiter);
139        }
140      } else {
141        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
142        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
143        QuotaLimiter rsLimiter =
144          quotaCache.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY);
145        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass();
146        boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled();
147        if (LOG.isTraceEnabled()) {
148          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
149            + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter="
150            + rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled);
151        }
152        if (!useNoop) {
153          if (exceedThrottleQuotaEnabled) {
154            return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
155              requestsPerSecondSupplier.get(), rsLimiter, userLimiter, tableLimiter, nsLimiter);
156          } else {
157            return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
158              requestsPerSecondSupplier.get(), userLimiter, tableLimiter, nsLimiter, rsLimiter);
159          }
160        }
161      }
162    }
163    return NoopOperationQuota.get();
164  }
165
166  @Override
167  public OperationQuota checkScanQuota(final Region region,
168    final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
169    long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
170    throws IOException, RpcThrottlingException {
171    Optional<User> user = RpcServer.getRequestUser();
172    UserGroupInformation ugi;
173    if (user.isPresent()) {
174      ugi = user.get().getUGI();
175    } else {
176      ugi = User.getCurrent().getUGI();
177    }
178    TableDescriptor tableDescriptor = region.getTableDescriptor();
179    TableName table = tableDescriptor.getTableName();
180
181    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
182    try {
183      quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
184        prevBlockBytesScannedDifference);
185    } catch (RpcThrottlingException e) {
186      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan="
187        + scanRequest.getScannerId() + ": " + e.getMessage());
188      throw e;
189    }
190    return quota;
191  }
192
193  @Override
194  public OperationQuota checkBatchQuota(final Region region,
195    final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
196    switch (type) {
197      case GET:
198        return this.checkBatchQuota(region, 0, 1, false);
199      case MUTATE:
200        return this.checkBatchQuota(region, 1, 0, false);
201      case CHECK_AND_MUTATE:
202        return this.checkBatchQuota(region, 1, 1, true);
203    }
204    throw new RuntimeException("Invalid operation type: " + type);
205  }
206
207  @Override
208  public OperationQuota checkBatchQuota(final Region region,
209    final List<ClientProtos.Action> actions, boolean hasCondition)
210    throws IOException, RpcThrottlingException {
211    int numWrites = 0;
212    int numReads = 0;
213    boolean isAtomic = false;
214    for (final ClientProtos.Action action : actions) {
215      if (action.hasMutation()) {
216        numWrites++;
217        OperationQuota.OperationType operationType =
218          QuotaUtil.getQuotaOperationType(action, hasCondition);
219        if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
220          numReads++;
221          // If any mutations in this batch are atomic, we will count the entire batch as atomic.
222          // This is a conservative approach, but it is the best that we can do without knowing
223          // the block bytes scanned of each individual action.
224          isAtomic = true;
225        }
226      } else if (action.hasGet()) {
227        numReads++;
228      }
229    }
230    return checkBatchQuota(region, numWrites, numReads, isAtomic);
231  }
232
233  /**
234   * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
235   * available quota and to report the data/usage of the operation.
236   * @param region    the region where the operation will be performed
237   * @param numWrites number of writes to perform
238   * @param numReads  number of short-reads to perform
239   * @return the OperationQuota
240   * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
241   */
242  @Override
243  public OperationQuota checkBatchQuota(final Region region, final int numWrites,
244    final int numReads, boolean isAtomic) throws IOException, RpcThrottlingException {
245    Optional<User> user = RpcServer.getRequestUser();
246    UserGroupInformation ugi;
247    if (user.isPresent()) {
248      ugi = user.get().getUGI();
249    } else {
250      ugi = User.getCurrent().getUGI();
251    }
252    TableDescriptor tableDescriptor = region.getTableDescriptor();
253    TableName table = tableDescriptor.getTableName();
254
255    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
256    try {
257      quota.checkBatchQuota(numWrites, numReads, isAtomic);
258    } catch (RpcThrottlingException e) {
259      LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table
260        + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage());
261      throw e;
262    }
263    return quota;
264  }
265}