001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.concurrent.ExecutorService;
026import java.util.concurrent.Executors;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.concurrent.TimeUnit;
029import java.util.stream.Collectors;
030import java.util.stream.IntStream;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.testclassification.ClientTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.junit.After;
038import org.junit.AfterClass;
039import org.junit.Before;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044
045import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
046
047@Category({ MediumTests.class, ClientTests.class })
048public class TestBufferedMutator {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestBufferedMutator.class);
053
054  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
055
056  private static TableName TABLE_NAME = TableName.valueOf("test");
057
058  private static byte[] CF = Bytes.toBytes("cf");
059
060  private static byte[] CQ = Bytes.toBytes("cq");
061
062  private static byte[] VALUE = new byte[1024];
063
064  @BeforeClass
065  public static void setUp() throws Exception {
066    TEST_UTIL.startMiniCluster(1);
067    ThreadLocalRandom.current().nextBytes(VALUE);
068  }
069
070  @AfterClass
071  public static void tearDown() throws Exception {
072    TEST_UTIL.shutdownMiniCluster();
073  }
074
075  @Before
076  public void setUpBeforeTest() throws IOException {
077    TEST_UTIL.createTable(TABLE_NAME, CF);
078  }
079
080  @After
081  public void tearDownAfterTest() throws IOException {
082    TEST_UTIL.deleteTable(TABLE_NAME);
083  }
084
085  @Test
086  public void test() throws Exception {
087    int count = 1024;
088    try (BufferedMutator mutator = TEST_UTIL.getConnection()
089      .getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(64 * 1024))) {
090      mutator.mutate(IntStream.range(0, count / 2)
091        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
092        .collect(Collectors.toList()));
093      mutator.flush();
094      mutator.mutate(IntStream.range(count / 2, count)
095        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
096        .collect(Collectors.toList()));
097      mutator.close();
098      verifyData(count);
099    }
100  }
101
102  @Test
103  public void testMultiThread() throws Exception {
104    ExecutorService executor =
105      Executors.newFixedThreadPool(16, new ThreadFactoryBuilder().setDaemon(true).build());
106    // use a greater count and less write buffer size to trigger auto flush when mutate
107    int count = 16384;
108    try (BufferedMutator mutator = TEST_UTIL.getConnection()
109      .getBufferedMutator(new BufferedMutatorParams(TABLE_NAME).writeBufferSize(4 * 1024))) {
110      IntStream.range(0, count / 2)
111        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
112        .forEach(put -> executor.execute(() -> {
113          try {
114            mutator.mutate(put);
115          } catch (IOException e) {
116            fail("failed to mutate: " + e.getMessage());
117          }
118        }));
119      mutator.flush();
120      IntStream.range(count / 2, count)
121        .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
122        .forEach(put -> executor.execute(() -> {
123          try {
124            mutator.mutate(put);
125          } catch (IOException e) {
126            fail("failed to mutate: " + e.getMessage());
127          }
128        }));
129      executor.shutdown();
130      assertTrue(executor.awaitTermination(15, TimeUnit.SECONDS));
131      mutator.close();
132    } finally {
133      executor.shutdownNow();
134    }
135    verifyData(count);
136  }
137
138  private void verifyData(int count) throws IOException {
139    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
140      for (int i = 0; i < count; i++) {
141        Result r = table.get(new Get(Bytes.toBytes(i)));
142        assertArrayEquals(VALUE, ((Result) r).getValue(CF, CQ));
143      }
144    }
145  }
146}