001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.Mockito.verify;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.security.PrivilegedAction;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Random;
039import java.util.Set;
040import java.util.UUID;
041import java.util.concurrent.Callable;
042import java.util.concurrent.ConcurrentHashMap;
043import java.util.concurrent.CopyOnWriteArrayList;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.ThreadLocalRandom;
046import java.util.stream.Collectors;
047import java.util.stream.Stream;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.FileStatus;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.LocatedFileStatus;
052import org.apache.hadoop.fs.Path;
053import org.apache.hadoop.fs.RemoteIterator;
054import org.apache.hadoop.hbase.ArrayBackedTag;
055import org.apache.hadoop.hbase.Cell;
056import org.apache.hadoop.hbase.CellUtil;
057import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
058import org.apache.hadoop.hbase.ExtendedCell;
059import org.apache.hadoop.hbase.HBaseClassTestRule;
060import org.apache.hadoop.hbase.HBaseConfiguration;
061import org.apache.hadoop.hbase.HBaseTestingUtil;
062import org.apache.hadoop.hbase.HConstants;
063import org.apache.hadoop.hbase.HDFSBlocksDistribution;
064import org.apache.hadoop.hbase.HadoopShims;
065import org.apache.hadoop.hbase.KeyValue;
066import org.apache.hadoop.hbase.PrivateCellUtil;
067import org.apache.hadoop.hbase.ServerName;
068import org.apache.hadoop.hbase.StartTestingClusterOption;
069import org.apache.hadoop.hbase.TableName;
070import org.apache.hadoop.hbase.Tag;
071import org.apache.hadoop.hbase.TagType;
072import org.apache.hadoop.hbase.client.Admin;
073import org.apache.hadoop.hbase.client.AsyncConnection;
074import org.apache.hadoop.hbase.client.BufferedMutator;
075import org.apache.hadoop.hbase.client.BufferedMutatorParams;
076import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
077import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
078import org.apache.hadoop.hbase.client.Connection;
079import org.apache.hadoop.hbase.client.ConnectionFactory;
080import org.apache.hadoop.hbase.client.ConnectionRegistry;
081import org.apache.hadoop.hbase.client.ConnectionUtils;
082import org.apache.hadoop.hbase.client.Hbck;
083import org.apache.hadoop.hbase.client.Put;
084import org.apache.hadoop.hbase.client.RegionLocator;
085import org.apache.hadoop.hbase.client.Result;
086import org.apache.hadoop.hbase.client.ResultScanner;
087import org.apache.hadoop.hbase.client.Scan;
088import org.apache.hadoop.hbase.client.Table;
089import org.apache.hadoop.hbase.client.TableBuilder;
090import org.apache.hadoop.hbase.client.TableDescriptor;
091import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
092import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
093import org.apache.hadoop.hbase.io.compress.Compression;
094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
096import org.apache.hadoop.hbase.io.hfile.CacheConfig;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
099import org.apache.hadoop.hbase.io.hfile.HFileScanner;
100import org.apache.hadoop.hbase.regionserver.BloomType;
101import org.apache.hadoop.hbase.regionserver.HRegion;
102import org.apache.hadoop.hbase.regionserver.HStore;
103import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
104import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
105import org.apache.hadoop.hbase.security.User;
106import org.apache.hadoop.hbase.testclassification.LargeTests;
107import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
108import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
109import org.apache.hadoop.hbase.util.Bytes;
110import org.apache.hadoop.hbase.util.CommonFSUtils;
111import org.apache.hadoop.hbase.util.FSUtils;
112import org.apache.hadoop.hbase.util.FutureUtils;
113import org.apache.hadoop.hbase.util.ReflectionUtils;
114import org.apache.hadoop.hdfs.DistributedFileSystem;
115import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
116import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
117import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
118import org.apache.hadoop.io.NullWritable;
119import org.apache.hadoop.mapreduce.Job;
120import org.apache.hadoop.mapreduce.Mapper;
121import org.apache.hadoop.mapreduce.RecordWriter;
122import org.apache.hadoop.mapreduce.TaskAttemptContext;
123import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
124import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
125import org.apache.hadoop.security.UserGroupInformation;
126import org.junit.Assert;
127import org.junit.ClassRule;
128import org.junit.Ignore;
129import org.junit.Test;
130import org.junit.experimental.categories.Category;
131import org.mockito.Mockito;
132import org.slf4j.Logger;
133import org.slf4j.LoggerFactory;
134
135/**
136 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
137 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
138 * values.
139 */
140@Category({ VerySlowMapReduceTests.class, LargeTests.class })
141public class TestHFileOutputFormat2 {
142
143  @ClassRule
144  public static final HBaseClassTestRule CLASS_RULE =
145    HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
146
147  private final static int ROWSPERSPLIT = 1024;
148  private static final int DEFAULT_VALUE_LENGTH = 1000;
149
150  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
151  private static final byte[][] FAMILIES =
152    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
153  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
154    .map(TableName::valueOf).toArray(TableName[]::new);
155
156  private HBaseTestingUtil util = new HBaseTestingUtil();
157
158  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
159
160  /**
161   * Simple mapper that makes KeyValue output.
162   */
163  static class RandomKVGeneratingMapper
164    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
165
166    private int keyLength;
167    private static final int KEYLEN_DEFAULT = 10;
168    private static final String KEYLEN_CONF = "randomkv.key.length";
169
170    private int valLength;
171    private static final int VALLEN_DEFAULT = 10;
172    private static final String VALLEN_CONF = "randomkv.val.length";
173    private static final byte[] QUALIFIER = Bytes.toBytes("data");
174    private boolean multiTableMapper = false;
175    private TableName[] tables = null;
176
177    @Override
178    protected void setup(Context context) throws IOException, InterruptedException {
179      super.setup(context);
180
181      Configuration conf = context.getConfiguration();
182      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
183      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
184      multiTableMapper =
185        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
186      if (multiTableMapper) {
187        tables = TABLE_NAMES;
188      } else {
189        tables = new TableName[] { TABLE_NAMES[0] };
190      }
191    }
192
193    @Override
194    protected void map(NullWritable n1, NullWritable n2,
195      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
196      throws java.io.IOException, InterruptedException {
197
198      byte keyBytes[] = new byte[keyLength];
199      byte valBytes[] = new byte[valLength];
200
201      int taskId = context.getTaskAttemptID().getTaskID().getId();
202      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
203      byte[] key;
204      for (int j = 0; j < tables.length; ++j) {
205        for (int i = 0; i < ROWSPERSPLIT; i++) {
206          Bytes.random(keyBytes);
207          // Ensure that unique tasks generate unique keys
208          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
209          Bytes.random(valBytes);
210          key = keyBytes;
211          if (multiTableMapper) {
212            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
213          }
214
215          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
216            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
217            context.write(new ImmutableBytesWritable(key), kv);
218          }
219        }
220      }
221    }
222  }
223
224  /**
225   * Simple mapper that makes Put output.
226   */
227  static class RandomPutGeneratingMapper
228    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
229
230    private int keyLength;
231    private static final int KEYLEN_DEFAULT = 10;
232    private static final String KEYLEN_CONF = "randomkv.key.length";
233
234    private int valLength;
235    private static final int VALLEN_DEFAULT = 10;
236    private static final String VALLEN_CONF = "randomkv.val.length";
237    private static final byte[] QUALIFIER = Bytes.toBytes("data");
238    private boolean multiTableMapper = false;
239    private TableName[] tables = null;
240
241    @Override
242    protected void setup(Context context) throws IOException, InterruptedException {
243      super.setup(context);
244
245      Configuration conf = context.getConfiguration();
246      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
247      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
248      multiTableMapper =
249        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
250      if (multiTableMapper) {
251        tables = TABLE_NAMES;
252      } else {
253        tables = new TableName[] { TABLE_NAMES[0] };
254      }
255    }
256
257    @Override
258    protected void map(NullWritable n1, NullWritable n2,
259      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
260      throws java.io.IOException, InterruptedException {
261
262      byte keyBytes[] = new byte[keyLength];
263      byte valBytes[] = new byte[valLength];
264
265      int taskId = context.getTaskAttemptID().getTaskID().getId();
266      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
267
268      byte[] key;
269      for (int j = 0; j < tables.length; ++j) {
270        for (int i = 0; i < ROWSPERSPLIT; i++) {
271          Bytes.random(keyBytes);
272          // Ensure that unique tasks generate unique keys
273          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
274          Bytes.random(valBytes);
275          key = keyBytes;
276          if (multiTableMapper) {
277            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
278          }
279
280          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
281            Put p = new Put(keyBytes);
282            p.addColumn(family, QUALIFIER, valBytes);
283            // set TTL to very low so that the scan does not return any value
284            p.setTTL(1l);
285            context.write(new ImmutableBytesWritable(key), p);
286          }
287        }
288      }
289    }
290  }
291
292  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
293    if (putSortReducer) {
294      job.setInputFormatClass(NMapInputFormat.class);
295      job.setMapperClass(RandomPutGeneratingMapper.class);
296      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
297      job.setMapOutputValueClass(Put.class);
298    } else {
299      job.setInputFormatClass(NMapInputFormat.class);
300      job.setMapperClass(RandomKVGeneratingMapper.class);
301      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
302      job.setMapOutputValueClass(KeyValue.class);
303    }
304  }
305
306  /**
307   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
308   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
309   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
310   */
311  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
312  @Test
313  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
314    Configuration conf = new Configuration(this.util.getConfiguration());
315    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
316    TaskAttemptContext context = null;
317    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
318    try {
319      Job job = new Job(conf);
320      FileOutputFormat.setOutputPath(job, dir);
321      context = createTestTaskAttemptContext(job);
322      HFileOutputFormat2 hof = new HFileOutputFormat2();
323      writer = hof.getRecordWriter(context);
324      final byte[] b = Bytes.toBytes("b");
325
326      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
327      // changed by call to write. Check all in kv is same but ts.
328      KeyValue kv = new KeyValue(b, b, b);
329      KeyValue original = kv.clone();
330      writer.write(new ImmutableBytesWritable(), kv);
331      assertFalse(original.equals(kv));
332      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
333      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
334      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
335      assertNotSame(original.getTimestamp(), kv.getTimestamp());
336      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
337
338      // Test 2. Now test passing a kv that has explicit ts. It should not be
339      // changed by call to record write.
340      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
341      original = kv.clone();
342      writer.write(new ImmutableBytesWritable(), kv);
343      assertTrue(original.equals(kv));
344    } finally {
345      if (writer != null && context != null) writer.close(context);
346      dir.getFileSystem(conf).delete(dir, true);
347    }
348  }
349
350  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
351    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
352    TaskAttemptContext context =
353      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
354    return context;
355  }
356
357  /*
358   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
359   * time-restricted scans.
360   */
361  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
362  @Test
363  public void test_TIMERANGE() throws Exception {
364    Configuration conf = new Configuration(this.util.getConfiguration());
365    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
366    TaskAttemptContext context = null;
367    Path dir = util.getDataTestDir("test_TIMERANGE_present");
368    LOG.info("Timerange dir writing to dir: " + dir);
369    try {
370      // build a record writer using HFileOutputFormat2
371      Job job = new Job(conf);
372      FileOutputFormat.setOutputPath(job, dir);
373      context = createTestTaskAttemptContext(job);
374      HFileOutputFormat2 hof = new HFileOutputFormat2();
375      writer = hof.getRecordWriter(context);
376
377      // Pass two key values with explicit times stamps
378      final byte[] b = Bytes.toBytes("b");
379
380      // value 1 with timestamp 2000
381      KeyValue kv = new KeyValue(b, b, b, 2000, b);
382      KeyValue original = kv.clone();
383      writer.write(new ImmutableBytesWritable(), kv);
384      assertEquals(original, kv);
385
386      // value 2 with timestamp 1000
387      kv = new KeyValue(b, b, b, 1000, b);
388      original = kv.clone();
389      writer.write(new ImmutableBytesWritable(), kv);
390      assertEquals(original, kv);
391
392      // verify that the file has the proper FileInfo.
393      writer.close(context);
394
395      // the generated file lives 1 directory down from the attempt directory
396      // and is the only file, e.g.
397      // _attempt__0000_r_000000_0/b/1979617994050536795
398      FileSystem fs = FileSystem.get(conf);
399      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
400      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
401      FileStatus[] file = fs.listStatus(sub1[0].getPath());
402
403      // open as HFile Reader and pull out TIMERANGE FileInfo.
404      HFile.Reader rd =
405        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
406      Map<byte[], byte[]> finfo = rd.getHFileInfo();
407      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
408      assertNotNull(range);
409
410      // unmarshall and check values.
411      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
412      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
413      assertEquals(1000, timeRangeTracker.getMin());
414      assertEquals(2000, timeRangeTracker.getMax());
415      rd.close();
416    } finally {
417      if (writer != null && context != null) writer.close(context);
418      dir.getFileSystem(conf).delete(dir, true);
419    }
420  }
421
422  /**
423   * Run small MR job.
424   */
425  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
426  @Test
427  public void testWritingPEData() throws Exception {
428    Configuration conf = util.getConfiguration();
429    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
430    FileSystem fs = testDir.getFileSystem(conf);
431
432    // Set down this value or we OOME in eclipse.
433    conf.setInt("mapreduce.task.io.sort.mb", 20);
434    // Write a few files.
435    long hregionMaxFilesize = 10 * 1024;
436    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
437
438    Job job = new Job(conf, "testWritingPEData");
439    setupRandomGeneratorMapper(job, false);
440    // This partitioner doesn't work well for number keys but using it anyways
441    // just to demonstrate how to configure it.
442    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
443    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
444
445    Arrays.fill(startKey, (byte) 0);
446    Arrays.fill(endKey, (byte) 0xff);
447
448    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
449    // Set start and end rows for partitioner.
450    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
451    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
452    job.setReducerClass(CellSortReducer.class);
453    job.setOutputFormatClass(HFileOutputFormat2.class);
454    job.setNumReduceTasks(4);
455    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
456      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
457      CellSerialization.class.getName());
458
459    FileOutputFormat.setOutputPath(job, testDir);
460    assertTrue(job.waitForCompletion(false));
461    FileStatus[] files = fs.listStatus(testDir);
462    assertTrue(files.length > 0);
463
464    // check output file num and size.
465    for (byte[] family : FAMILIES) {
466      long kvCount = 0;
467      RemoteIterator<LocatedFileStatus> iterator =
468        fs.listFiles(testDir.suffix("/" + new String(family)), true);
469      while (iterator.hasNext()) {
470        LocatedFileStatus keyFileStatus = iterator.next();
471        HFile.Reader reader =
472          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
473        HFileScanner scanner = reader.getScanner(conf, false, false, false);
474
475        kvCount += reader.getEntries();
476        scanner.seekTo();
477        long perKVSize = scanner.getCell().getSerializedSize();
478        assertTrue("Data size of each file should not be too large.",
479          perKVSize * reader.getEntries() <= hregionMaxFilesize);
480      }
481      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
482    }
483  }
484
485  /**
486   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
487   */
488  @Test
489  public void test_WritingTagData() throws Exception {
490    Configuration conf = new Configuration(this.util.getConfiguration());
491    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
492    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
493    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
494    TaskAttemptContext context = null;
495    Path dir = util.getDataTestDir("WritingTagData");
496    try {
497      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
498      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
499      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
500      Job job = new Job(conf);
501      FileOutputFormat.setOutputPath(job, dir);
502      context = createTestTaskAttemptContext(job);
503      HFileOutputFormat2 hof = new HFileOutputFormat2();
504      writer = hof.getRecordWriter(context);
505      final byte[] b = Bytes.toBytes("b");
506
507      List<Tag> tags = new ArrayList<>();
508      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
509      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
510      writer.write(new ImmutableBytesWritable(), kv);
511      writer.close(context);
512      writer = null;
513      FileSystem fs = dir.getFileSystem(conf);
514      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
515      while (iterator.hasNext()) {
516        LocatedFileStatus keyFileStatus = iterator.next();
517        HFile.Reader reader =
518          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
519        HFileScanner scanner = reader.getScanner(conf, false, false, false);
520        scanner.seekTo();
521        ExtendedCell cell = scanner.getCell();
522        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
523        assertTrue(tagsFromCell.size() > 0);
524        for (Tag tag : tagsFromCell) {
525          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
526        }
527      }
528    } finally {
529      if (writer != null && context != null) writer.close(context);
530      dir.getFileSystem(conf).delete(dir, true);
531    }
532  }
533
534  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
535  @Test
536  public void testJobConfiguration() throws Exception {
537    Configuration conf = new Configuration(this.util.getConfiguration());
538    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
539      util.getDataTestDir("testJobConfiguration").toString());
540    Job job = new Job(conf);
541    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
542    Table table = Mockito.mock(Table.class);
543    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
544    setupMockStartKeys(regionLocator);
545    setupMockTableName(regionLocator);
546    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
547    assertEquals(job.getNumReduceTasks(), 4);
548  }
549
550  private byte[][] generateRandomStartKeys(int numKeys) {
551    Random random = ThreadLocalRandom.current();
552    byte[][] ret = new byte[numKeys][];
553    // first region start key is always empty
554    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
555    for (int i = 1; i < numKeys; i++) {
556      ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
557    }
558    return ret;
559  }
560
561  /*
562   * This method takes some time and is done inline uploading data. For example, doing the mapfile
563   * test, generation of the key and value consumes about 30% of CPU time.
564   * @return Generated random value to insert into a table cell.
565   */
566  public static byte[] generateData(final Random r, int length) {
567    byte[] b = new byte[length];
568    int i;
569
570    for (i = 0; i < (length - 8); i += 8) {
571      b[i] = (byte) (65 + r.nextInt(26));
572      b[i + 1] = b[i];
573      b[i + 2] = b[i];
574      b[i + 3] = b[i];
575      b[i + 4] = b[i];
576      b[i + 5] = b[i];
577      b[i + 6] = b[i];
578      b[i + 7] = b[i];
579    }
580
581    byte a = (byte) (65 + r.nextInt(26));
582    for (; i < length; i++) {
583      b[i] = a;
584    }
585    return b;
586  }
587
588  private byte[][] generateRandomSplitKeys(int numKeys) {
589    Random random = ThreadLocalRandom.current();
590    byte[][] ret = new byte[numKeys][];
591    for (int i = 0; i < numKeys; i++) {
592      ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
593    }
594    return ret;
595  }
596
597  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
598  @Test
599  public void testMRIncrementalLoad() throws Exception {
600    LOG.info("\nStarting test testMRIncrementalLoad\n");
601    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
602  }
603
604  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
605  @Test
606  public void testMRIncrementalLoadWithSplit() throws Exception {
607    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
608    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
609  }
610
611  /**
612   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
613   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
614   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
615   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
616   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
617   * locality features more easily.
618   */
619  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
620  @Test
621  public void testMRIncrementalLoadWithLocality() throws Exception {
622    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
623    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
624    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
625  }
626
627  // @Ignore("Wahtevs")
628  @Test
629  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
630    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
631    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
632  }
633
634  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
635    boolean putSortReducer, String tableStr) throws Exception {
636    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
637      Arrays.asList(tableStr));
638  }
639
640  @Test
641  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
642    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
643    doIncrementalLoadTest(false, false, true,
644      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
645  }
646
647  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
648    boolean putSortReducer, List<String> tableStr) throws Exception {
649    util = new HBaseTestingUtil();
650    Configuration conf = util.getConfiguration();
651    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
652    int hostCount = 1;
653    int regionNum = 5;
654    if (shouldKeepLocality) {
655      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
656      // explicit hostnames parameter just like MiniDFSCluster does.
657      hostCount = 3;
658      regionNum = 20;
659    }
660
661    String[] hostnames = new String[hostCount];
662    for (int i = 0; i < hostCount; ++i) {
663      hostnames[i] = "datanode_" + i;
664    }
665    StartTestingClusterOption option = StartTestingClusterOption.builder()
666      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
667    util.startMiniCluster(option);
668
669    Map<String, Table> allTables = new HashMap<>(tableStr.size());
670    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
671    boolean writeMultipleTables = tableStr.size() > 1;
672    for (String tableStrSingle : tableStr) {
673      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
674      TableName tableName = TableName.valueOf(tableStrSingle);
675      Table table = util.createTable(tableName, FAMILIES, splitKeys);
676
677      RegionLocator r = util.getConnection().getRegionLocator(tableName);
678      assertEquals("Should start with empty table", 0, util.countRows(table));
679      int numRegions = r.getStartKeys().length;
680      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
681
682      allTables.put(tableStrSingle, table);
683      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
684    }
685    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
686    // Generate the bulk load files
687    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
688    if (writeMultipleTables) {
689      testDir = new Path(testDir, "default");
690    }
691
692    for (Table tableSingle : allTables.values()) {
693      // This doesn't write into the table, just makes files
694      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
695    }
696    int numTableDirs = 0;
697    FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir);
698    for (FileStatus tf : fss) {
699      Path tablePath = testDir;
700      if (writeMultipleTables) {
701        if (allTables.containsKey(tf.getPath().getName())) {
702          ++numTableDirs;
703          tablePath = tf.getPath();
704        } else {
705          continue;
706        }
707      }
708
709      // Make sure that a directory was created for every CF
710      int dir = 0;
711      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
712      for (FileStatus f : fss) {
713        for (byte[] family : FAMILIES) {
714          if (Bytes.toString(family).equals(f.getPath().getName())) {
715            ++dir;
716          }
717        }
718      }
719      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
720    }
721    if (writeMultipleTables) {
722      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
723    }
724
725    Admin admin = util.getConnection().getAdmin();
726    try {
727      // handle the split case
728      if (shouldChangeRegions) {
729        Table chosenTable = allTables.values().iterator().next();
730        // Choose a semi-random table if multiple tables are available
731        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
732        admin.disableTable(chosenTable.getName());
733        util.waitUntilNoRegionsInTransition();
734
735        util.deleteTable(chosenTable.getName());
736        byte[][] newSplitKeys = generateRandomSplitKeys(14);
737        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
738
739        while (
740          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
741            .size() != 15 || !admin.isTableAvailable(table.getName())
742        ) {
743          Thread.sleep(200);
744          LOG.info("Waiting for new region assignment to happen");
745        }
746      }
747
748      // Perform the actual load
749      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
750        Path tableDir = testDir;
751        String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
752        LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
753        if (writeMultipleTables) {
754          tableDir = new Path(testDir, tableNameStr);
755        }
756        Table currentTable = allTables.get(tableNameStr);
757        TableName currentTableName = currentTable.getName();
758        BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
759
760        // Ensure data shows up
761        int expectedRows = 0;
762        if (putSortReducer) {
763          // no rows should be extracted
764          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
765            util.countRows(currentTable));
766        } else {
767          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
768          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
769            util.countRows(currentTable));
770          Scan scan = new Scan();
771          ResultScanner results = currentTable.getScanner(scan);
772          for (Result res : results) {
773            assertEquals(FAMILIES.length, res.rawCells().length);
774            Cell first = res.rawCells()[0];
775            for (Cell kv : res.rawCells()) {
776              assertTrue(CellUtil.matchingRows(first, kv));
777              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
778            }
779          }
780          results.close();
781        }
782        String tableDigestBefore = util.checksumRows(currentTable);
783        // Check region locality
784        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
785        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
786          hbd.add(region.getHDFSBlocksDistribution());
787        }
788        for (String hostname : hostnames) {
789          float locality = hbd.getBlockLocalityIndex(hostname);
790          LOG.info("locality of [" + hostname + "]: " + locality);
791          assertEquals(100, (int) (locality * 100));
792        }
793
794        // Cause regions to reopen
795        admin.disableTable(currentTableName);
796        while (!admin.isTableDisabled(currentTableName)) {
797          Thread.sleep(200);
798          LOG.info("Waiting for table to disable");
799        }
800        admin.enableTable(currentTableName);
801        util.waitTableAvailable(currentTableName);
802        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
803          util.checksumRows(currentTable));
804      }
805    } finally {
806      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
807        tableInfoSingle.getRegionLocator().close();
808      }
809      for (Entry<String, Table> singleTable : allTables.entrySet()) {
810        singleTable.getValue().close();
811        util.deleteTable(singleTable.getValue().getName());
812      }
813      testDir.getFileSystem(conf).delete(testDir, true);
814      util.shutdownMiniCluster();
815    }
816  }
817
818  private void runIncrementalPELoad(Configuration conf,
819    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
820    throws IOException, InterruptedException, ClassNotFoundException {
821    Job job = new Job(conf, "testLocalMRIncrementalLoad");
822    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
823    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
824      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
825      CellSerialization.class.getName());
826    setupRandomGeneratorMapper(job, putSortReducer);
827    if (tableInfo.size() > 1) {
828      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
829      int sum = 0;
830      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
831        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
832      }
833      assertEquals(sum, job.getNumReduceTasks());
834    } else {
835      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
836      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(),
837        regionLocator);
838      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
839    }
840
841    FileOutputFormat.setOutputPath(job, outDir);
842
843    assertFalse(util.getTestFileSystem().exists(outDir));
844
845    assertTrue(job.waitForCompletion(true));
846  }
847
848  /**
849   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
850   * family compression map is correctly serialized into and deserialized from configuration
851   */
852  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
853  @Test
854  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
855    for (int numCfs = 0; numCfs <= 3; numCfs++) {
856      Configuration conf = new Configuration(this.util.getConfiguration());
857      Map<String, Compression.Algorithm> familyToCompression =
858        getMockColumnFamiliesForCompression(numCfs);
859      Table table = Mockito.mock(Table.class);
860      setupMockColumnFamiliesForCompression(table, familyToCompression);
861      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
862        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
863          Arrays.asList(table.getDescriptor())));
864
865      // read back family specific compression setting from the configuration
866      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
867        HFileOutputFormat2.createFamilyCompressionMap(conf);
868
869      // test that we have a value for all column families that matches with the
870      // used mock values
871      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
872        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
873          entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
874      }
875    }
876  }
877
878  private void setupMockColumnFamiliesForCompression(Table table,
879    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
880
881    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
882    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
883      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
884        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
885        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
886
887      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
888    }
889    Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor();
890  }
891
892  /**
893   * @return a map from column family names to compression algorithms for testing column family
894   *         compression. Column family names have special characters
895   */
896  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
897    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
898    // use column family names having special characters
899    if (numCfs-- > 0) {
900      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
901    }
902    if (numCfs-- > 0) {
903      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
904    }
905    if (numCfs-- > 0) {
906      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
907    }
908    if (numCfs-- > 0) {
909      familyToCompression.put("Family3", Compression.Algorithm.NONE);
910    }
911    return familyToCompression;
912  }
913
914  /**
915   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
916   * family bloom type map is correctly serialized into and deserialized from configuration
917   */
918  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
919  @Test
920  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
921    for (int numCfs = 0; numCfs <= 2; numCfs++) {
922      Configuration conf = new Configuration(this.util.getConfiguration());
923      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
924      Table table = Mockito.mock(Table.class);
925      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
926      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
927        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
928          Arrays.asList(table.getDescriptor())));
929
930      // read back family specific data block encoding settings from the
931      // configuration
932      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
933        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
934
935      // test that we have a value for all column families that matches with the
936      // used mock values
937      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
938        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
939          entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
940      }
941    }
942  }
943
944  private void setupMockColumnFamiliesForBloomType(Table table,
945    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
946    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
947    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
948      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
949        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
950        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
951      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
952    }
953    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
954  }
955
956  /**
957   * @return a map from column family names to compression algorithms for testing column family
958   *         compression. Column family names have special characters
959   */
960  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
961    Map<String, BloomType> familyToBloomType = new HashMap<>();
962    // use column family names having special characters
963    if (numCfs-- > 0) {
964      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
965    }
966    if (numCfs-- > 0) {
967      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
968    }
969    if (numCfs-- > 0) {
970      familyToBloomType.put("Family3", BloomType.NONE);
971    }
972    return familyToBloomType;
973  }
974
975  /**
976   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
977   * family block size map is correctly serialized into and deserialized from configuration
978   */
979  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
980  @Test
981  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
982    for (int numCfs = 0; numCfs <= 3; numCfs++) {
983      Configuration conf = new Configuration(this.util.getConfiguration());
984      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
985      Table table = Mockito.mock(Table.class);
986      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
987      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
988        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
989          Arrays.asList(table.getDescriptor())));
990
991      // read back family specific data block encoding settings from the
992      // configuration
993      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
994        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
995
996      // test that we have a value for all column families that matches with the
997      // used mock values
998      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
999        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
1000          entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
1001      }
1002    }
1003  }
1004
1005  private void setupMockColumnFamiliesForBlockSize(Table table,
1006    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
1007    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1008    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
1009      ColumnFamilyDescriptor columnFamilyDescriptor =
1010        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
1011          .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
1012      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1013    }
1014    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1015  }
1016
1017  /**
1018   * @return a map from column family names to compression algorithms for testing column family
1019   *         compression. Column family names have special characters
1020   */
1021  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
1022    Map<String, Integer> familyToBlockSize = new HashMap<>();
1023    // use column family names having special characters
1024    if (numCfs-- > 0) {
1025      familyToBlockSize.put("Family1!@#!@#&", 1234);
1026    }
1027    if (numCfs-- > 0) {
1028      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1029    }
1030    if (numCfs-- > 0) {
1031      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1032    }
1033    if (numCfs-- > 0) {
1034      familyToBlockSize.put("Family3", 0);
1035    }
1036    return familyToBlockSize;
1037  }
1038
1039  /**
1040   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
1041   * the family data block encoding map is correctly serialized into and deserialized from
1042   * configuration
1043   */
1044  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1045  @Test
1046  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1047    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1048      Configuration conf = new Configuration(this.util.getConfiguration());
1049      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1050        getMockColumnFamiliesForDataBlockEncoding(numCfs);
1051      Table table = Mockito.mock(Table.class);
1052      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
1053      TableDescriptor tableDescriptor = table.getDescriptor();
1054      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1055        HFileOutputFormat2.serializeColumnFamilyAttribute(
1056          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
1057
1058      // read back family specific data block encoding settings from the
1059      // configuration
1060      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1061        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
1062
1063      // test that we have a value for all column families that matches with the
1064      // used mock values
1065      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1066        assertEquals(
1067          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
1068          entry.getValue(),
1069          retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
1070      }
1071    }
1072  }
1073
1074  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1075    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1076    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1077    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1078      ColumnFamilyDescriptor columnFamilyDescriptor =
1079        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
1080          .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)
1081          .build();
1082      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1083    }
1084    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1085  }
1086
1087  /**
1088   * @return a map from column family names to compression algorithms for testing column family
1089   *         compression. Column family names have special characters
1090   */
1091  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1092    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1093    // use column family names having special characters
1094    if (numCfs-- > 0) {
1095      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1096    }
1097    if (numCfs-- > 0) {
1098      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1099    }
1100    if (numCfs-- > 0) {
1101      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1102    }
1103    if (numCfs-- > 0) {
1104      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1105    }
1106    return familyToDataBlockEncoding;
1107  }
1108
1109  private void setupMockStartKeys(RegionLocator table) throws IOException {
1110    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1111      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1112    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1113  }
1114
1115  private void setupMockTableName(RegionLocator table) throws IOException {
1116    TableName mockTableName = TableName.valueOf("mock_table");
1117    Mockito.doReturn(mockTableName).when(table).getName();
1118  }
1119
1120  /**
1121   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1122   * from the column family descriptor
1123   */
1124  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1125  @Test
1126  public void testColumnFamilySettings() throws Exception {
1127    Configuration conf = new Configuration(this.util.getConfiguration());
1128    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1129    TaskAttemptContext context = null;
1130    Path dir = util.getDataTestDir("testColumnFamilySettings");
1131
1132    // Setup table descriptor
1133    Table table = Mockito.mock(Table.class);
1134    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1135    TableDescriptorBuilder tableDescriptorBuilder =
1136      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1137
1138    Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor();
1139    for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) {
1140      tableDescriptorBuilder.setColumnFamily(hcd);
1141    }
1142
1143    // set up the table to return some mock keys
1144    setupMockStartKeys(regionLocator);
1145
1146    try {
1147      // partial map red setup to get an operational writer for testing
1148      // We turn off the sequence file compression, because DefaultCodec
1149      // pollutes the GZip codec pool with an incompatible compressor.
1150      conf.set("io.seqfile.compression.type", "NONE");
1151      conf.set("hbase.fs.tmp.dir", dir.toString());
1152      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1153      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1154
1155      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1156      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1157      setupRandomGeneratorMapper(job, false);
1158      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
1159      FileOutputFormat.setOutputPath(job, dir);
1160      context = createTestTaskAttemptContext(job);
1161      HFileOutputFormat2 hof = new HFileOutputFormat2();
1162      writer = hof.getRecordWriter(context);
1163
1164      // write out random rows
1165      writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(),
1166        ROWSPERSPLIT);
1167      writer.close(context);
1168
1169      // Make sure that a directory was created for every CF
1170      FileSystem fs = dir.getFileSystem(conf);
1171
1172      // commit so that the filesystem has one directory per column family
1173      hof.getOutputCommitter(context).commitTask(context);
1174      hof.getOutputCommitter(context).commitJob(context);
1175      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1176      assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length);
1177      for (FileStatus f : families) {
1178        String familyStr = f.getPath().getName();
1179        ColumnFamilyDescriptor hcd =
1180          tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr));
1181        // verify that the compression on this file matches the configured
1182        // compression
1183        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1184        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1185        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1186
1187        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1188        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1189        assertEquals(
1190          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1191          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1192        assertEquals(
1193          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1194          hcd.getCompressionType(), reader.getFileContext().getCompression());
1195      }
1196    } finally {
1197      dir.getFileSystem(conf).delete(dir, true);
1198    }
1199  }
1200
1201  /**
1202   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1203   * family descriptors
1204   */
1205  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1206    TaskAttemptContext context, Set<byte[]> families, int numRows)
1207    throws IOException, InterruptedException {
1208    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1209    int valLength = 10;
1210    byte valBytes[] = new byte[valLength];
1211
1212    int taskId = context.getTaskAttemptID().getTaskID().getId();
1213    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1214    final byte[] qualifier = Bytes.toBytes("data");
1215    for (int i = 0; i < numRows; i++) {
1216      Bytes.putInt(keyBytes, 0, i);
1217      Bytes.random(valBytes);
1218      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1219      for (byte[] family : families) {
1220        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1221        writer.write(key, kv);
1222      }
1223    }
1224  }
1225
1226  /**
1227   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1228   * excluded from minor compaction. Without the fix of HBASE-6901, an
1229   * ArrayIndexOutOfBoundsException will be thrown.
1230   */
1231  @Ignore("Flakey: See HBASE-9051")
1232  @Test
1233  public void testExcludeAllFromMinorCompaction() throws Exception {
1234    Configuration conf = util.getConfiguration();
1235    conf.setInt("hbase.hstore.compaction.min", 2);
1236    generateRandomStartKeys(5);
1237
1238    util.startMiniCluster();
1239    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1240      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1241      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1242      final FileSystem fs = util.getDFSCluster().getFileSystem();
1243      assertEquals("Should start with empty table", 0, util.countRows(table));
1244
1245      // deep inspection: get the StoreFile dir
1246      final Path storePath =
1247        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1248          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1249            Bytes.toString(FAMILIES[0])));
1250      assertEquals(0, fs.listStatus(storePath).length);
1251
1252      // Generate two bulk load files
1253      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1254
1255      for (int i = 0; i < 2; i++) {
1256        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1257        runIncrementalPELoad(conf,
1258          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
1259            conn.getRegionLocator(TABLE_NAMES[0]))),
1260          testDir, false);
1261        // Perform the actual load
1262        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1263      }
1264
1265      // Ensure data shows up
1266      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1267      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
1268        util.countRows(table));
1269
1270      // should have a second StoreFile now
1271      assertEquals(2, fs.listStatus(storePath).length);
1272
1273      // minor compactions shouldn't get rid of the file
1274      admin.compact(TABLE_NAMES[0]);
1275      try {
1276        quickPoll(new Callable<Boolean>() {
1277          @Override
1278          public Boolean call() throws Exception {
1279            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1280            for (HRegion region : regions) {
1281              for (HStore store : region.getStores()) {
1282                store.closeAndArchiveCompactedFiles();
1283              }
1284            }
1285            return fs.listStatus(storePath).length == 1;
1286          }
1287        }, 5000);
1288        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1289      } catch (AssertionError ae) {
1290        // this is expected behavior
1291      }
1292
1293      // a major compaction should work though
1294      admin.majorCompact(TABLE_NAMES[0]);
1295      quickPoll(new Callable<Boolean>() {
1296        @Override
1297        public Boolean call() throws Exception {
1298          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1299          for (HRegion region : regions) {
1300            for (HStore store : region.getStores()) {
1301              store.closeAndArchiveCompactedFiles();
1302            }
1303          }
1304          return fs.listStatus(storePath).length == 1;
1305        }
1306      }, 5000);
1307
1308    } finally {
1309      util.shutdownMiniCluster();
1310    }
1311  }
1312
1313  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1314  @Test
1315  public void testExcludeMinorCompaction() throws Exception {
1316    Configuration conf = util.getConfiguration();
1317    conf.setInt("hbase.hstore.compaction.min", 2);
1318    generateRandomStartKeys(5);
1319
1320    util.startMiniCluster();
1321    try (Connection conn = ConnectionFactory.createConnection(conf);
1322      Admin admin = conn.getAdmin()) {
1323      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1324      final FileSystem fs = util.getDFSCluster().getFileSystem();
1325      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1326      assertEquals("Should start with empty table", 0, util.countRows(table));
1327
1328      // deep inspection: get the StoreFile dir
1329      final Path storePath =
1330        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1331          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1332            Bytes.toString(FAMILIES[0])));
1333      assertEquals(0, fs.listStatus(storePath).length);
1334
1335      // put some data in it and flush to create a storefile
1336      Put p = new Put(Bytes.toBytes("test"));
1337      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1338      table.put(p);
1339      admin.flush(TABLE_NAMES[0]);
1340      assertEquals(1, util.countRows(table));
1341      quickPoll(new Callable<Boolean>() {
1342        @Override
1343        public Boolean call() throws Exception {
1344          return fs.listStatus(storePath).length == 1;
1345        }
1346      }, 5000);
1347
1348      // Generate a bulk load file with more rows
1349      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1350
1351      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1352      runIncrementalPELoad(conf,
1353        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
1354        testDir, false);
1355
1356      // Perform the actual load
1357      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1358
1359      // Ensure data shows up
1360      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1361      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1,
1362        util.countRows(table));
1363
1364      // should have a second StoreFile now
1365      assertEquals(2, fs.listStatus(storePath).length);
1366
1367      // minor compactions shouldn't get rid of the file
1368      admin.compact(TABLE_NAMES[0]);
1369      try {
1370        quickPoll(new Callable<Boolean>() {
1371          @Override
1372          public Boolean call() throws Exception {
1373            return fs.listStatus(storePath).length == 1;
1374          }
1375        }, 5000);
1376        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1377      } catch (AssertionError ae) {
1378        // this is expected behavior
1379      }
1380
1381      // a major compaction should work though
1382      admin.majorCompact(TABLE_NAMES[0]);
1383      quickPoll(new Callable<Boolean>() {
1384        @Override
1385        public Boolean call() throws Exception {
1386          return fs.listStatus(storePath).length == 1;
1387        }
1388      }, 5000);
1389
1390    } finally {
1391      util.shutdownMiniCluster();
1392    }
1393  }
1394
1395  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1396    int sleepMs = 10;
1397    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1398    while (retries-- > 0) {
1399      if (c.call().booleanValue()) {
1400        return;
1401      }
1402      Thread.sleep(sleepMs);
1403    }
1404    fail();
1405  }
1406
1407  public static void main(String args[]) throws Exception {
1408    new TestHFileOutputFormat2().manualTest(args);
1409  }
1410
1411  public void manualTest(String args[]) throws Exception {
1412    Configuration conf = HBaseConfiguration.create();
1413    util = new HBaseTestingUtil(conf);
1414    if ("newtable".equals(args[0])) {
1415      TableName tname = TableName.valueOf(args[1]);
1416      byte[][] splitKeys = generateRandomSplitKeys(4);
1417      Table table = util.createTable(tname, FAMILIES, splitKeys);
1418    } else if ("incremental".equals(args[0])) {
1419      TableName tname = TableName.valueOf(args[1]);
1420      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1421        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1422        Path outDir = new Path("incremental-out");
1423        runIncrementalPELoad(conf,
1424          Arrays
1425            .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
1426          outDir, false);
1427      }
1428    } else {
1429      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1430    }
1431  }
1432
1433  @Test
1434  public void testBlockStoragePolicy() throws Exception {
1435    util = new HBaseTestingUtil();
1436    Configuration conf = util.getConfiguration();
1437    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1438
1439    conf.set(
1440      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1441        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1442      "ONE_SSD");
1443    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1444    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1445    util.startMiniDFSCluster(3);
1446    FileSystem fs = util.getDFSCluster().getFileSystem();
1447    try {
1448      fs.mkdirs(cf1Dir);
1449      fs.mkdirs(cf2Dir);
1450
1451      // the original block storage policy would be HOT
1452      String spA = getStoragePolicyName(fs, cf1Dir);
1453      String spB = getStoragePolicyName(fs, cf2Dir);
1454      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1455      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1456      assertEquals("HOT", spA);
1457      assertEquals("HOT", spB);
1458
1459      // alter table cf schema to change storage policies
1460      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1461        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1462      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1463        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1464      spA = getStoragePolicyName(fs, cf1Dir);
1465      spB = getStoragePolicyName(fs, cf2Dir);
1466      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1467      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1468      assertNotNull(spA);
1469      assertEquals("ONE_SSD", spA);
1470      assertNotNull(spB);
1471      assertEquals("ALL_SSD", spB);
1472    } finally {
1473      fs.delete(cf1Dir, true);
1474      fs.delete(cf2Dir, true);
1475      util.shutdownMiniDFSCluster();
1476    }
1477  }
1478
1479  private String getStoragePolicyName(FileSystem fs, Path path) {
1480    try {
1481      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1482      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1483    } catch (Exception e) {
1484      // Maybe fail because of using old HDFS version, try the old way
1485      if (LOG.isTraceEnabled()) {
1486        LOG.trace("Failed to get policy directly", e);
1487      }
1488      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1489      return policy == null ? "HOT" : policy;// HOT by default
1490    }
1491  }
1492
1493  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1494    try {
1495      if (fs instanceof DistributedFileSystem) {
1496        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1497        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1498        if (null != status) {
1499          byte storagePolicyId = status.getStoragePolicy();
1500          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1501          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1502            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1503            for (BlockStoragePolicy policy : policies) {
1504              if (policy.getId() == storagePolicyId) {
1505                return policy.getName();
1506              }
1507            }
1508          }
1509        }
1510      }
1511    } catch (Throwable e) {
1512      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1513    }
1514
1515    return null;
1516  }
1517
1518  @Test
1519  public void testConfigurePartitioner() throws Exception {
1520    util.startMiniDFSCluster(1);
1521    try {
1522      Configuration conf = util.getConfiguration();
1523      // Create a user who is not the current user
1524      String fooUserName = "foo1234";
1525      String fooGroupName = "group1";
1526      UserGroupInformation ugi =
1527        UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName });
1528      // Get user's home directory
1529      Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
1530        @Override
1531        public Path run() {
1532          try (FileSystem fs = FileSystem.get(conf)) {
1533            return fs.makeQualified(fs.getHomeDirectory());
1534          } catch (IOException ioe) {
1535            LOG.error("Failed to get foo's home directory", ioe);
1536          }
1537          return null;
1538        }
1539      });
1540      // create the home directory and chown
1541      FileSystem fs = FileSystem.get(conf);
1542      fs.mkdirs(fooHomeDirectory);
1543      fs.setOwner(fooHomeDirectory, fooUserName, fooGroupName);
1544
1545      Job job = Mockito.mock(Job.class);
1546      Mockito.doReturn(conf).when(job).getConfiguration();
1547      ImmutableBytesWritable writable = new ImmutableBytesWritable();
1548      List<ImmutableBytesWritable> splitPoints = new ArrayList<ImmutableBytesWritable>();
1549      splitPoints.add(writable);
1550
1551      ugi.doAs(new PrivilegedAction<Void>() {
1552        @Override
1553        public Void run() {
1554          try {
1555            HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
1556          } catch (IOException ioe) {
1557            LOG.error("Failed to configure partitioner", ioe);
1558          }
1559          return null;
1560        }
1561      });
1562      // verify that the job uses TotalOrderPartitioner
1563      verify(job).setPartitionerClass(TotalOrderPartitioner.class);
1564      // verify that TotalOrderPartitioner.setPartitionFile() is called.
1565      String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path");
1566      Assert.assertNotNull(partitionPathString);
1567      // Make sure the partion file is in foo1234's home directory, and that
1568      // the file exists.
1569      Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
1570      Assert.assertTrue(fs.exists(new Path(partitionPathString)));
1571    } finally {
1572      util.shutdownMiniDFSCluster();
1573    }
1574  }
1575
1576  @Test
1577  public void TestConfigureCompression() throws Exception {
1578    Configuration conf = new Configuration(this.util.getConfiguration());
1579    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1580    TaskAttemptContext context = null;
1581    Path dir = util.getDataTestDir("TestConfigureCompression");
1582    String hfileoutputformatCompression = "gz";
1583
1584    try {
1585      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1586      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1587
1588      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1589
1590      Job job = Job.getInstance(conf);
1591      FileOutputFormat.setOutputPath(job, dir);
1592      context = createTestTaskAttemptContext(job);
1593      HFileOutputFormat2 hof = new HFileOutputFormat2();
1594      writer = hof.getRecordWriter(context);
1595      final byte[] b = Bytes.toBytes("b");
1596
1597      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1598      writer.write(new ImmutableBytesWritable(), kv);
1599      writer.close(context);
1600      writer = null;
1601      FileSystem fs = dir.getFileSystem(conf);
1602      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1603      while (iterator.hasNext()) {
1604        LocatedFileStatus keyFileStatus = iterator.next();
1605        HFile.Reader reader =
1606          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1607        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1608          hfileoutputformatCompression);
1609      }
1610    } finally {
1611      if (writer != null && context != null) {
1612        writer.close(context);
1613      }
1614      dir.getFileSystem(conf).delete(dir, true);
1615    }
1616
1617  }
1618
1619  @Test
1620  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1621    // Start cluster A
1622    util = new HBaseTestingUtil();
1623    Configuration confA = util.getConfiguration();
1624    int hostCount = 3;
1625    int regionNum = 20;
1626    String[] hostnames = new String[hostCount];
1627    for (int i = 0; i < hostCount; ++i) {
1628      hostnames[i] = "datanode_" + i;
1629    }
1630    StartTestingClusterOption option = StartTestingClusterOption.builder()
1631      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1632    util.startMiniCluster(option);
1633
1634    // Start cluster B
1635    HBaseTestingUtil utilB = new HBaseTestingUtil();
1636    Configuration confB = utilB.getConfiguration();
1637    utilB.startMiniCluster(option);
1638
1639    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1640
1641    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1642    TableName tableName = TableName.valueOf("table");
1643    // Create table in cluster B
1644    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
1645      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
1646      // Generate the bulk load files
1647      // Job has zookeeper configuration for cluster A
1648      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1649      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1650      Configuration jobConf = job.getConfiguration();
1651      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1652      job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
1653      setupRandomGeneratorMapper(job, false);
1654      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1655
1656      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1657        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1658      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1659        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1660      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1661        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1662
1663      String bSpecificConfigKey = "my.override.config.for.b";
1664      String bSpecificConfigValue = "b-specific-value";
1665      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1666        bSpecificConfigValue);
1667
1668      FileOutputFormat.setOutputPath(job, testDir);
1669
1670      assertFalse(util.getTestFileSystem().exists(testDir));
1671
1672      assertTrue(job.waitForCompletion(true));
1673
1674      final List<Configuration> configs =
1675        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1676
1677      assertFalse(configs.isEmpty());
1678      for (Configuration config : configs) {
1679        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1680          config.get(HConstants.ZOOKEEPER_QUORUM));
1681        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1682          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1683        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1684          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1685
1686        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1687      }
1688    } finally {
1689      utilB.deleteTable(tableName);
1690      testDir.getFileSystem(confA).delete(testDir, true);
1691      util.shutdownMiniCluster();
1692      utilB.shutdownMiniCluster();
1693    }
1694  }
1695
1696  private static class ConfigurationCaptorConnection implements Connection {
1697    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1698
1699    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1700
1701    private final Connection delegate;
1702
1703    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
1704      ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
1705      // here we do not use this registry, so close it...
1706      registry.close();
1707      // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection
1708      // implementation in below method
1709      delegate =
1710        FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes))
1711          .toConnection();
1712
1713      final String uuid = conf.get(UUID_KEY);
1714      if (uuid != null) {
1715        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1716      }
1717    }
1718
1719    static UUID configureConnectionImpl(Configuration conf) {
1720      conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
1721        ConfigurationCaptorConnection.class, Connection.class);
1722
1723      final UUID uuid = UUID.randomUUID();
1724      conf.set(UUID_KEY, uuid.toString());
1725      return uuid;
1726    }
1727
1728    static List<Configuration> getCapturedConfigarutions(UUID key) {
1729      return confs.get(key);
1730    }
1731
1732    @Override
1733    public Configuration getConfiguration() {
1734      return delegate.getConfiguration();
1735    }
1736
1737    @Override
1738    public Table getTable(TableName tableName) throws IOException {
1739      return delegate.getTable(tableName);
1740    }
1741
1742    @Override
1743    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1744      return delegate.getTable(tableName, pool);
1745    }
1746
1747    @Override
1748    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1749      return delegate.getBufferedMutator(tableName);
1750    }
1751
1752    @Override
1753    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1754      return delegate.getBufferedMutator(params);
1755    }
1756
1757    @Override
1758    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1759      return delegate.getRegionLocator(tableName);
1760    }
1761
1762    @Override
1763    public void clearRegionLocationCache() {
1764      delegate.clearRegionLocationCache();
1765    }
1766
1767    @Override
1768    public Admin getAdmin() throws IOException {
1769      return delegate.getAdmin();
1770    }
1771
1772    @Override
1773    public void close() throws IOException {
1774      delegate.close();
1775    }
1776
1777    @Override
1778    public boolean isClosed() {
1779      return delegate.isClosed();
1780    }
1781
1782    @Override
1783    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1784      return delegate.getTableBuilder(tableName, pool);
1785    }
1786
1787    @Override
1788    public AsyncConnection toAsyncConnection() {
1789      return delegate.toAsyncConnection();
1790    }
1791
1792    @Override
1793    public String getClusterId() {
1794      return delegate.getClusterId();
1795    }
1796
1797    @Override
1798    public Hbck getHbck() throws IOException {
1799      return delegate.getHbck();
1800    }
1801
1802    @Override
1803    public Hbck getHbck(ServerName masterServer) throws IOException {
1804      return delegate.getHbck(masterServer);
1805    }
1806
1807    @Override
1808    public void abort(String why, Throwable e) {
1809      delegate.abort(why, e);
1810    }
1811
1812    @Override
1813    public boolean isAborted() {
1814      return delegate.isAborted();
1815    }
1816  }
1817
1818}