001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.quotas;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.DoNotRetryIOException;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.TableNotDisabledException;
032import org.apache.hadoop.hbase.TableNotEnabledException;
033import org.apache.hadoop.hbase.TableNotFoundException;
034import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
044import org.apache.hadoop.hbase.regionserver.BloomType;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.util.Pair;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.apache.yetus.audience.InterfaceStability;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
061
062/**
063 * Helper class to interact with the quota table
064 */
065@InterfaceAudience.Private
066@InterfaceStability.Evolving
067public class QuotaUtil extends QuotaTableUtil {
068  private static final Logger LOG = LoggerFactory.getLogger(QuotaUtil.class);
069
070  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
071  private static final boolean QUOTA_ENABLED_DEFAULT = false;
072
073  public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
074  // the default one read capacity unit is 1024 bytes (1KB)
075  public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
076  public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
077  // the default one write capacity unit is 1024 bytes (1KB)
078  public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
079
080  /*
081   * The below defaults, if configured, will be applied to otherwise unthrottled users. For example,
082   * set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure
083   * that any given user may not query more than 1mb per second from any given machine, unless
084   * explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and
085   * QuotaScope.MACHINE.
086   */
087  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM =
088    "hbase.quota.default.user.machine.read.num";
089  public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE =
090    "hbase.quota.default.user.machine.read.size";
091  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM =
092    "hbase.quota.default.user.machine.request.num";
093  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE =
094    "hbase.quota.default.user.machine.request.size";
095  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM =
096    "hbase.quota.default.user.machine.write.num";
097  public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE =
098    "hbase.quota.default.user.machine.write.size";
099  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE =
100    "hbase.quota.default.user.machine.atomic.read.size";
101  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM =
102    "hbase.quota.default.user.machine.atomic.request.num";
103  public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE =
104    "hbase.quota.default.user.machine.atomic.write.size";
105  public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS =
106    "hbase.quota.default.user.machine.request.handler.usage.ms";
107
108  /** Table descriptor for Quota internal table */
109  public static final TableDescriptor QUOTA_TABLE_DESC =
110    TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME)
111      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_INFO)
112        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
113        .setMaxVersions(1).build())
114      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(QUOTA_FAMILY_USAGE)
115        .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.ROW)
116        .setMaxVersions(1).build())
117      .build();
118
119  /** Returns true if the support for quota is enabled */
120  public static boolean isQuotaEnabled(final Configuration conf) {
121    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
122  }
123
124  /*
125   * ========================================================================= Quota "settings"
126   * helpers
127   */
128  public static void addTableQuota(final Connection connection, final TableName table,
129    final Quotas data) throws IOException {
130    addQuotas(connection, getTableRowKey(table), data);
131  }
132
133  public static void deleteTableQuota(final Connection connection, final TableName table)
134    throws IOException {
135    deleteQuotas(connection, getTableRowKey(table));
136  }
137
138  public static void addNamespaceQuota(final Connection connection, final String namespace,
139    final Quotas data) throws IOException {
140    addQuotas(connection, getNamespaceRowKey(namespace), data);
141  }
142
143  public static void deleteNamespaceQuota(final Connection connection, final String namespace)
144    throws IOException {
145    deleteQuotas(connection, getNamespaceRowKey(namespace));
146  }
147
148  public static void addUserQuota(final Connection connection, final String user, final Quotas data)
149    throws IOException {
150    addQuotas(connection, getUserRowKey(user), data);
151  }
152
153  public static void addUserQuota(final Connection connection, final String user,
154    final TableName table, final Quotas data) throws IOException {
155    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table), data);
156  }
157
158  public static void addUserQuota(final Connection connection, final String user,
159    final String namespace, final Quotas data) throws IOException {
160    addQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace),
161      data);
162  }
163
164  public static void deleteUserQuota(final Connection connection, final String user)
165    throws IOException {
166    deleteQuotas(connection, getUserRowKey(user));
167  }
168
169  public static void deleteUserQuota(final Connection connection, final String user,
170    final TableName table) throws IOException {
171    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserTable(table));
172  }
173
174  public static void deleteUserQuota(final Connection connection, final String user,
175    final String namespace) throws IOException {
176    deleteQuotas(connection, getUserRowKey(user), getSettingsQualifierForUserNamespace(namespace));
177  }
178
179  public static void addRegionServerQuota(final Connection connection, final String regionServer,
180    final Quotas data) throws IOException {
181    addQuotas(connection, getRegionServerRowKey(regionServer), data);
182  }
183
184  public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
185    throws IOException {
186    deleteQuotas(connection, getRegionServerRowKey(regionServer));
187  }
188
189  public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
190    boolean hasCondition) {
191    if (action.hasMutation()) {
192      return getQuotaOperationType(action.getMutation(), hasCondition);
193    }
194    return OperationQuota.OperationType.GET;
195  }
196
197  public static OperationQuota.OperationType
198    getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
199    return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
200  }
201
202  private static OperationQuota.OperationType
203    getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
204    ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
205    if (
206      hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
207        || mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
208    ) {
209      return OperationQuota.OperationType.CHECK_AND_MUTATE;
210    }
211    return OperationQuota.OperationType.MUTATE;
212  }
213
214  protected static void switchExceedThrottleQuota(final Connection connection,
215    boolean exceedThrottleQuotaEnabled) throws IOException {
216    if (exceedThrottleQuotaEnabled) {
217      checkRSQuotaToEnableExceedThrottle(
218        getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
219    }
220
221    Put put = new Put(getExceedThrottleQuotaRowKey());
222    put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
223      Bytes.toBytes(exceedThrottleQuotaEnabled));
224    doPut(connection, put);
225  }
226
227  private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
228    if (quotas != null && quotas.hasThrottle()) {
229      Throttle throttle = quotas.getThrottle();
230      // If enable exceed throttle quota, make sure that there are at least one read(req/read +
231      // num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
232      boolean hasReadQuota = false;
233      boolean hasWriteQuota = false;
234      if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
235        hasReadQuota = true;
236        hasWriteQuota = true;
237      }
238      if (
239        !hasReadQuota
240          && (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())
241      ) {
242        hasReadQuota = true;
243      }
244      if (!hasReadQuota) {
245        throw new DoNotRetryIOException(
246          "Please set at least one read region server quota before enable exceed throttle quota");
247      }
248      if (
249        !hasWriteQuota
250          && (throttle.hasWriteNum() || throttle.hasWriteSize() || throttle.hasWriteCapacityUnit())
251      ) {
252        hasWriteQuota = true;
253      }
254      if (!hasWriteQuota) {
255        throw new DoNotRetryIOException("Please set at least one write region server quota "
256          + "before enable exceed throttle quota");
257      }
258      // If enable exceed throttle quota, make sure that region server throttle quotas are in
259      // seconds time unit. Because once previous requests exceed their quota and consume region
260      // server quota, quota in other time units may be refilled in a long time, this may affect
261      // later requests.
262      List<Pair<Boolean, TimedQuota>> list =
263        Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
264          Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
265          Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
266          Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
267          Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
268          Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
269          Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
270          Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
271          Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
272      for (Pair<Boolean, TimedQuota> pair : list) {
273        if (pair.getFirst()) {
274          if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
275            throw new DoNotRetryIOException("All region server quota must be "
276              + "in seconds time unit if enable exceed throttle quota");
277          }
278        }
279      }
280    } else {
281      // If enable exceed throttle quota, make sure that region server quota is already set
282      throw new DoNotRetryIOException(
283        "Please set region server quota before enable exceed throttle quota");
284    }
285  }
286
287  protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
288    throws IOException {
289    Get get = new Get(getExceedThrottleQuotaRowKey());
290    get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
291    Result result = doGet(connection, get);
292    if (result.isEmpty()) {
293      return false;
294    }
295    return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
296  }
297
298  private static void addQuotas(final Connection connection, final byte[] rowKey, final Quotas data)
299    throws IOException {
300    addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
301  }
302
303  private static void addQuotas(final Connection connection, final byte[] rowKey,
304    final byte[] qualifier, final Quotas data) throws IOException {
305    Put put = new Put(rowKey);
306    put.addColumn(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
307    doPut(connection, put);
308  }
309
310  private static void deleteQuotas(final Connection connection, final byte[] rowKey)
311    throws IOException {
312    deleteQuotas(connection, rowKey, null);
313  }
314
315  private static void deleteQuotas(final Connection connection, final byte[] rowKey,
316    final byte[] qualifier) throws IOException {
317    Delete delete = new Delete(rowKey);
318    if (qualifier != null) {
319      delete.addColumns(QUOTA_FAMILY_INFO, qualifier);
320    }
321    if (isNamespaceRowKey(rowKey)) {
322      String ns = getNamespaceFromRowKey(rowKey);
323      Quotas namespaceQuota = getNamespaceQuota(connection, ns);
324      if (namespaceQuota != null && namespaceQuota.hasSpace()) {
325        // When deleting namespace space quota, also delete table usage(u:p) snapshots
326        deleteTableUsageSnapshotsForNamespace(connection, ns);
327      }
328    }
329    doDelete(connection, delete);
330  }
331
332  public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
333    final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor)
334    throws IOException {
335    long nowTs = EnvironmentEdgeManager.currentTime();
336    Result[] results = doGet(connection, gets);
337
338    Map<String, UserQuotaState> userQuotas = new HashMap<>(results.length);
339    for (int i = 0; i < results.length; ++i) {
340      byte[] key = gets.get(i).getRow();
341      assert isUserRowKey(key);
342      String user = getUserFromRowKey(key);
343
344      if (results[i].isEmpty()) {
345        userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration(), nowTs));
346        continue;
347      }
348
349      final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
350      userQuotas.put(user, quotaInfo);
351
352      assert Bytes.equals(key, results[i].getRow());
353
354      try {
355        parseUserResult(user, results[i], new UserQuotasVisitor() {
356          @Override
357          public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
358            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
359            quotaInfo.setQuotas(namespace, quotas);
360          }
361
362          @Override
363          public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
364            quotas = updateClusterQuotaToMachineQuota(quotas,
365              tableMachineQuotaFactors.containsKey(table)
366                ? tableMachineQuotaFactors.get(table)
367                : 1);
368            quotaInfo.setQuotas(table, quotas);
369          }
370
371          @Override
372          public void visitUserQuotas(String userName, Quotas quotas) {
373            quotas = updateClusterQuotaToMachineQuota(quotas, factor);
374            quotaInfo.setQuotas(quotas);
375          }
376        });
377      } catch (IOException e) {
378        LOG.error("Unable to parse user '" + user + "' quotas", e);
379        userQuotas.remove(user);
380      }
381    }
382    return userQuotas;
383  }
384
385  protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf, long nowTs) {
386    QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder();
387
388    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM)
389      .ifPresent(throttleBuilder::setReadNum);
390    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE)
391      .ifPresent(throttleBuilder::setReadSize);
392    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM)
393      .ifPresent(throttleBuilder::setReqNum);
394    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE)
395      .ifPresent(throttleBuilder::setReqSize);
396    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM)
397      .ifPresent(throttleBuilder::setWriteNum);
398    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE)
399      .ifPresent(throttleBuilder::setWriteSize);
400    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE)
401      .ifPresent(throttleBuilder::setAtomicReadSize);
402    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM)
403      .ifPresent(throttleBuilder::setAtomicReqNum);
404    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE)
405      .ifPresent(throttleBuilder::setAtomicWriteSize);
406    buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS)
407      .ifPresent(throttleBuilder::setReqHandlerUsageMs);
408
409    UserQuotaState state = new UserQuotaState(nowTs);
410    QuotaProtos.Quotas defaultQuotas =
411      QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
412    state.setQuotas(defaultQuotas);
413    return state;
414  }
415
416  private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) {
417    int defaultSoftLimit = conf.getInt(key, -1);
418    if (defaultSoftLimit == -1) {
419      return Optional.empty();
420    }
421    return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit,
422      java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
423  }
424
425  public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
426    final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
427    return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
428      @Override
429      public TableName getKeyFromRow(final byte[] row) {
430        assert isTableRowKey(row);
431        return getTableFromRowKey(row);
432      }
433
434      @Override
435      public double getFactor(TableName tableName) {
436        return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
437      }
438    });
439  }
440
441  public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
442    final List<Get> gets, double factor) throws IOException {
443    return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
444      @Override
445      public String getKeyFromRow(final byte[] row) {
446        assert isNamespaceRowKey(row);
447        return getNamespaceFromRowKey(row);
448      }
449
450      @Override
451      public double getFactor(String s) {
452        return factor;
453      }
454    });
455  }
456
457  public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection,
458    final List<Get> gets) throws IOException {
459    return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() {
460      @Override
461      public String getKeyFromRow(final byte[] row) {
462        assert isRegionServerRowKey(row);
463        return getRegionServerFromRowKey(row);
464      }
465
466      @Override
467      public double getFactor(String s) {
468        return 1;
469      }
470    });
471  }
472
473  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
474    final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException {
475    long nowTs = EnvironmentEdgeManager.currentTime();
476    Result[] results = doGet(connection, gets);
477
478    Map<K, QuotaState> globalQuotas = new HashMap<>(results.length);
479    for (int i = 0; i < results.length; ++i) {
480      byte[] row = gets.get(i).getRow();
481      K key = kfr.getKeyFromRow(row);
482
483      QuotaState quotaInfo = new QuotaState(nowTs);
484      globalQuotas.put(key, quotaInfo);
485
486      if (results[i].isEmpty()) continue;
487      assert Bytes.equals(row, results[i].getRow());
488
489      byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
490      if (data == null) continue;
491
492      try {
493        Quotas quotas = quotasFromData(data);
494        quotas = updateClusterQuotaToMachineQuota(quotas, kfr.getFactor(key));
495        quotaInfo.setQuotas(quotas);
496      } catch (IOException e) {
497        LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
498        globalQuotas.remove(key);
499      }
500    }
501    return globalQuotas;
502  }
503
504  /**
505   * Convert cluster scope quota to machine scope quota
506   * @param quotas the original quota
507   * @param factor factor used to divide cluster limiter to machine limiter
508   * @return the converted quota whose quota limiters all in machine scope
509   */
510  private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
511    Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
512    if (newQuotas.hasThrottle()) {
513      Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
514      if (throttle.hasReqNum()) {
515        throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
516      }
517      if (throttle.hasReqSize()) {
518        throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
519      }
520      if (throttle.hasReadNum()) {
521        throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
522      }
523      if (throttle.hasReadSize()) {
524        throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
525      }
526      if (throttle.hasWriteNum()) {
527        throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
528      }
529      if (throttle.hasWriteSize()) {
530        throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
531      }
532      if (throttle.hasReqCapacityUnit()) {
533        throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
534      }
535      if (throttle.hasReadCapacityUnit()) {
536        throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
537      }
538      if (throttle.hasWriteCapacityUnit()) {
539        throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
540      }
541      newQuotas.setThrottle(throttle.build());
542    }
543    return newQuotas.build();
544  }
545
546  private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
547    if (timedQuota.getScope() == QuotaScope.CLUSTER) {
548      TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
549      newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
550        .setScope(QuotaScope.MACHINE);
551      return newTimedQuota.build();
552    } else {
553      return timedQuota;
554    }
555  }
556
557  private static interface KeyFromRow<T> {
558    T getKeyFromRow(final byte[] row);
559
560    double getFactor(T t);
561  }
562
563  /*
564   * ========================================================================= HTable helpers
565   */
566  private static void doPut(final Connection connection, final Put put) throws IOException {
567    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
568      table.put(put);
569    }
570  }
571
572  private static void doDelete(final Connection connection, final Delete delete)
573    throws IOException {
574    try (Table table = connection.getTable(QuotaUtil.QUOTA_TABLE_NAME)) {
575      table.delete(delete);
576    }
577  }
578
579  /*
580   * ========================================================================= Data Size Helpers
581   */
582  public static long calculateMutationSize(final Mutation mutation) {
583    long size = 0;
584    for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
585      for (Cell cell : entry.getValue()) {
586        size += cell.getSerializedSize();
587      }
588    }
589    return size;
590  }
591
592  public static long calculateResultSize(final Result result) {
593    long size = 0;
594    for (Cell cell : result.rawCells()) {
595      size += cell.getSerializedSize();
596    }
597    return size;
598  }
599
600  public static long calculateResultSize(final List<Result> results) {
601    long size = 0;
602    for (Result result : results) {
603      for (Cell cell : result.rawCells()) {
604        size += cell.getSerializedSize();
605      }
606    }
607    return size;
608  }
609
610  public static long calculateCellsSize(final List<Cell> cells) {
611    long size = 0;
612    for (Cell cell : cells) {
613      size += cell.getSerializedSize();
614    }
615    return size;
616  }
617
618  /**
619   * Method to enable a table, if not already enabled. This method suppresses
620   * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
621   * the table.
622   * @param conn      connection to re-use
623   * @param tableName name of the table to be enabled
624   */
625  public static void enableTableIfNotEnabled(Connection conn, TableName tableName)
626    throws IOException {
627    try {
628      conn.getAdmin().enableTable(tableName);
629    } catch (TableNotDisabledException | TableNotFoundException e) {
630      // ignore
631    }
632  }
633
634  /**
635   * Method to disable a table, if not already disabled. This method suppresses
636   * {@link TableNotEnabledException}, if thrown while disabling the table.
637   * @param conn      connection to re-use
638   * @param tableName table name which has moved into space quota violation
639   */
640  public static void disableTableIfNotDisabled(Connection conn, TableName tableName)
641    throws IOException {
642    try {
643      conn.getAdmin().disableTable(tableName);
644    } catch (TableNotEnabledException | TableNotFoundException e) {
645      // ignore
646    }
647  }
648}