001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNotSame; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027import static org.mockito.Mockito.verify; 028 029import java.io.IOException; 030import java.lang.reflect.Field; 031import java.security.PrivilegedAction; 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.Random; 039import java.util.Set; 040import java.util.UUID; 041import java.util.concurrent.Callable; 042import java.util.concurrent.ConcurrentHashMap; 043import java.util.concurrent.CopyOnWriteArrayList; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.ThreadLocalRandom; 046import java.util.stream.Collectors; 047import java.util.stream.Stream; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.FileStatus; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.LocatedFileStatus; 052import org.apache.hadoop.fs.Path; 053import org.apache.hadoop.fs.RemoteIterator; 054import org.apache.hadoop.hbase.ArrayBackedTag; 055import org.apache.hadoop.hbase.Cell; 056import org.apache.hadoop.hbase.CellUtil; 057import org.apache.hadoop.hbase.CompatibilitySingletonFactory; 058import org.apache.hadoop.hbase.ExtendedCell; 059import org.apache.hadoop.hbase.HBaseClassTestRule; 060import org.apache.hadoop.hbase.HBaseConfiguration; 061import org.apache.hadoop.hbase.HBaseTestingUtil; 062import org.apache.hadoop.hbase.HConstants; 063import org.apache.hadoop.hbase.HDFSBlocksDistribution; 064import org.apache.hadoop.hbase.HadoopShims; 065import org.apache.hadoop.hbase.KeyValue; 066import org.apache.hadoop.hbase.PrivateCellUtil; 067import org.apache.hadoop.hbase.ServerName; 068import org.apache.hadoop.hbase.StartTestingClusterOption; 069import org.apache.hadoop.hbase.TableName; 070import org.apache.hadoop.hbase.Tag; 071import org.apache.hadoop.hbase.TagType; 072import org.apache.hadoop.hbase.client.Admin; 073import org.apache.hadoop.hbase.client.AsyncConnection; 074import org.apache.hadoop.hbase.client.BufferedMutator; 075import org.apache.hadoop.hbase.client.BufferedMutatorParams; 076import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 077import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 078import org.apache.hadoop.hbase.client.Connection; 079import org.apache.hadoop.hbase.client.ConnectionFactory; 080import org.apache.hadoop.hbase.client.ConnectionRegistry; 081import org.apache.hadoop.hbase.client.ConnectionUtils; 082import org.apache.hadoop.hbase.client.Hbck; 083import org.apache.hadoop.hbase.client.Put; 084import org.apache.hadoop.hbase.client.RegionLocator; 085import org.apache.hadoop.hbase.client.Result; 086import org.apache.hadoop.hbase.client.ResultScanner; 087import org.apache.hadoop.hbase.client.Scan; 088import org.apache.hadoop.hbase.client.Table; 089import org.apache.hadoop.hbase.client.TableBuilder; 090import org.apache.hadoop.hbase.client.TableDescriptor; 091import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 092import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 093import org.apache.hadoop.hbase.io.compress.Compression; 094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 096import org.apache.hadoop.hbase.io.hfile.CacheConfig; 097import org.apache.hadoop.hbase.io.hfile.HFile; 098import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 099import org.apache.hadoop.hbase.io.hfile.HFileScanner; 100import org.apache.hadoop.hbase.regionserver.BloomType; 101import org.apache.hadoop.hbase.regionserver.HRegion; 102import org.apache.hadoop.hbase.regionserver.HStore; 103import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem; 104import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; 105import org.apache.hadoop.hbase.security.User; 106import org.apache.hadoop.hbase.testclassification.LargeTests; 107import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; 108import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 109import org.apache.hadoop.hbase.util.Bytes; 110import org.apache.hadoop.hbase.util.CommonFSUtils; 111import org.apache.hadoop.hbase.util.FSUtils; 112import org.apache.hadoop.hbase.util.FutureUtils; 113import org.apache.hadoop.hbase.util.ReflectionUtils; 114import org.apache.hadoop.hdfs.DistributedFileSystem; 115import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 116import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 117import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 118import org.apache.hadoop.io.NullWritable; 119import org.apache.hadoop.mapreduce.Job; 120import org.apache.hadoop.mapreduce.Mapper; 121import org.apache.hadoop.mapreduce.RecordWriter; 122import org.apache.hadoop.mapreduce.TaskAttemptContext; 123import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 124import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; 125import org.apache.hadoop.security.UserGroupInformation; 126import org.junit.Assert; 127import org.junit.ClassRule; 128import org.junit.Ignore; 129import org.junit.Test; 130import org.junit.experimental.categories.Category; 131import org.mockito.Mockito; 132import org.slf4j.Logger; 133import org.slf4j.LoggerFactory; 134 135/** 136 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile 137 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and 138 * values. 139 */ 140@Category({ VerySlowMapReduceTests.class, LargeTests.class }) 141public class TestHFileOutputFormat2 { 142 143 @ClassRule 144 public static final HBaseClassTestRule CLASS_RULE = 145 HBaseClassTestRule.forClass(TestHFileOutputFormat2.class); 146 147 private final static int ROWSPERSPLIT = 1024; 148 private static final int DEFAULT_VALUE_LENGTH = 1000; 149 150 public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME; 151 private static final byte[][] FAMILIES = 152 { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) }; 153 private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3") 154 .map(TableName::valueOf).toArray(TableName[]::new); 155 156 private HBaseTestingUtil util = new HBaseTestingUtil(); 157 158 private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class); 159 160 /** 161 * Simple mapper that makes KeyValue output. 162 */ 163 static class RandomKVGeneratingMapper 164 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> { 165 166 private int keyLength; 167 private static final int KEYLEN_DEFAULT = 10; 168 private static final String KEYLEN_CONF = "randomkv.key.length"; 169 170 private int valLength; 171 private static final int VALLEN_DEFAULT = 10; 172 private static final String VALLEN_CONF = "randomkv.val.length"; 173 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 174 private boolean multiTableMapper = false; 175 private TableName[] tables = null; 176 177 @Override 178 protected void setup(Context context) throws IOException, InterruptedException { 179 super.setup(context); 180 181 Configuration conf = context.getConfiguration(); 182 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 183 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 184 multiTableMapper = 185 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 186 if (multiTableMapper) { 187 tables = TABLE_NAMES; 188 } else { 189 tables = new TableName[] { TABLE_NAMES[0] }; 190 } 191 } 192 193 @Override 194 protected void map(NullWritable n1, NullWritable n2, 195 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context) 196 throws java.io.IOException, InterruptedException { 197 198 byte keyBytes[] = new byte[keyLength]; 199 byte valBytes[] = new byte[valLength]; 200 201 int taskId = context.getTaskAttemptID().getTaskID().getId(); 202 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 203 byte[] key; 204 for (int j = 0; j < tables.length; ++j) { 205 for (int i = 0; i < ROWSPERSPLIT; i++) { 206 Bytes.random(keyBytes); 207 // Ensure that unique tasks generate unique keys 208 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 209 Bytes.random(valBytes); 210 key = keyBytes; 211 if (multiTableMapper) { 212 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 213 } 214 215 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 216 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes); 217 context.write(new ImmutableBytesWritable(key), kv); 218 } 219 } 220 } 221 } 222 } 223 224 /** 225 * Simple mapper that makes Put output. 226 */ 227 static class RandomPutGeneratingMapper 228 extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> { 229 230 private int keyLength; 231 private static final int KEYLEN_DEFAULT = 10; 232 private static final String KEYLEN_CONF = "randomkv.key.length"; 233 234 private int valLength; 235 private static final int VALLEN_DEFAULT = 10; 236 private static final String VALLEN_CONF = "randomkv.val.length"; 237 private static final byte[] QUALIFIER = Bytes.toBytes("data"); 238 private boolean multiTableMapper = false; 239 private TableName[] tables = null; 240 241 @Override 242 protected void setup(Context context) throws IOException, InterruptedException { 243 super.setup(context); 244 245 Configuration conf = context.getConfiguration(); 246 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT); 247 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT); 248 multiTableMapper = 249 conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false); 250 if (multiTableMapper) { 251 tables = TABLE_NAMES; 252 } else { 253 tables = new TableName[] { TABLE_NAMES[0] }; 254 } 255 } 256 257 @Override 258 protected void map(NullWritable n1, NullWritable n2, 259 Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context) 260 throws java.io.IOException, InterruptedException { 261 262 byte keyBytes[] = new byte[keyLength]; 263 byte valBytes[] = new byte[valLength]; 264 265 int taskId = context.getTaskAttemptID().getTaskID().getId(); 266 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 267 268 byte[] key; 269 for (int j = 0; j < tables.length; ++j) { 270 for (int i = 0; i < ROWSPERSPLIT; i++) { 271 Bytes.random(keyBytes); 272 // Ensure that unique tasks generate unique keys 273 keyBytes[keyLength - 1] = (byte) (taskId & 0xFF); 274 Bytes.random(valBytes); 275 key = keyBytes; 276 if (multiTableMapper) { 277 key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes); 278 } 279 280 for (byte[] family : TestHFileOutputFormat2.FAMILIES) { 281 Put p = new Put(keyBytes); 282 p.addColumn(family, QUALIFIER, valBytes); 283 // set TTL to very low so that the scan does not return any value 284 p.setTTL(1l); 285 context.write(new ImmutableBytesWritable(key), p); 286 } 287 } 288 } 289 } 290 } 291 292 private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) { 293 if (putSortReducer) { 294 job.setInputFormatClass(NMapInputFormat.class); 295 job.setMapperClass(RandomPutGeneratingMapper.class); 296 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 297 job.setMapOutputValueClass(Put.class); 298 } else { 299 job.setInputFormatClass(NMapInputFormat.class); 300 job.setMapperClass(RandomKVGeneratingMapper.class); 301 job.setMapOutputKeyClass(ImmutableBytesWritable.class); 302 job.setMapOutputValueClass(KeyValue.class); 303 } 304 } 305 306 /** 307 * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose 308 * timestamp is {@link HConstants#LATEST_TIMESTAMP}. 309 * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a> 310 */ 311 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 312 @Test 313 public void test_LATEST_TIMESTAMP_isReplaced() throws Exception { 314 Configuration conf = new Configuration(this.util.getConfiguration()); 315 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 316 TaskAttemptContext context = null; 317 Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced"); 318 try { 319 Job job = new Job(conf); 320 FileOutputFormat.setOutputPath(job, dir); 321 context = createTestTaskAttemptContext(job); 322 HFileOutputFormat2 hof = new HFileOutputFormat2(); 323 writer = hof.getRecordWriter(context); 324 final byte[] b = Bytes.toBytes("b"); 325 326 // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be 327 // changed by call to write. Check all in kv is same but ts. 328 KeyValue kv = new KeyValue(b, b, b); 329 KeyValue original = kv.clone(); 330 writer.write(new ImmutableBytesWritable(), kv); 331 assertFalse(original.equals(kv)); 332 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv))); 333 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv))); 334 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv))); 335 assertNotSame(original.getTimestamp(), kv.getTimestamp()); 336 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp()); 337 338 // Test 2. Now test passing a kv that has explicit ts. It should not be 339 // changed by call to record write. 340 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b); 341 original = kv.clone(); 342 writer.write(new ImmutableBytesWritable(), kv); 343 assertTrue(original.equals(kv)); 344 } finally { 345 if (writer != null && context != null) writer.close(context); 346 dir.getFileSystem(conf).delete(dir, true); 347 } 348 } 349 350 private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception { 351 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class); 352 TaskAttemptContext context = 353 hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0"); 354 return context; 355 } 356 357 /* 358 * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by 359 * time-restricted scans. 360 */ 361 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 362 @Test 363 public void test_TIMERANGE() throws Exception { 364 Configuration conf = new Configuration(this.util.getConfiguration()); 365 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 366 TaskAttemptContext context = null; 367 Path dir = util.getDataTestDir("test_TIMERANGE_present"); 368 LOG.info("Timerange dir writing to dir: " + dir); 369 try { 370 // build a record writer using HFileOutputFormat2 371 Job job = new Job(conf); 372 FileOutputFormat.setOutputPath(job, dir); 373 context = createTestTaskAttemptContext(job); 374 HFileOutputFormat2 hof = new HFileOutputFormat2(); 375 writer = hof.getRecordWriter(context); 376 377 // Pass two key values with explicit times stamps 378 final byte[] b = Bytes.toBytes("b"); 379 380 // value 1 with timestamp 2000 381 KeyValue kv = new KeyValue(b, b, b, 2000, b); 382 KeyValue original = kv.clone(); 383 writer.write(new ImmutableBytesWritable(), kv); 384 assertEquals(original, kv); 385 386 // value 2 with timestamp 1000 387 kv = new KeyValue(b, b, b, 1000, b); 388 original = kv.clone(); 389 writer.write(new ImmutableBytesWritable(), kv); 390 assertEquals(original, kv); 391 392 // verify that the file has the proper FileInfo. 393 writer.close(context); 394 395 // the generated file lives 1 directory down from the attempt directory 396 // and is the only file, e.g. 397 // _attempt__0000_r_000000_0/b/1979617994050536795 398 FileSystem fs = FileSystem.get(conf); 399 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent(); 400 FileStatus[] sub1 = fs.listStatus(attemptDirectory); 401 FileStatus[] file = fs.listStatus(sub1[0].getPath()); 402 403 // open as HFile Reader and pull out TIMERANGE FileInfo. 404 HFile.Reader rd = 405 HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf); 406 Map<byte[], byte[]> finfo = rd.getHFileInfo(); 407 byte[] range = finfo.get(Bytes.toBytes("TIMERANGE")); 408 assertNotNull(range); 409 410 // unmarshall and check values. 411 TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range); 412 LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax()); 413 assertEquals(1000, timeRangeTracker.getMin()); 414 assertEquals(2000, timeRangeTracker.getMax()); 415 rd.close(); 416 } finally { 417 if (writer != null && context != null) writer.close(context); 418 dir.getFileSystem(conf).delete(dir, true); 419 } 420 } 421 422 /** 423 * Run small MR job. 424 */ 425 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 426 @Test 427 public void testWritingPEData() throws Exception { 428 Configuration conf = util.getConfiguration(); 429 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData"); 430 FileSystem fs = testDir.getFileSystem(conf); 431 432 // Set down this value or we OOME in eclipse. 433 conf.setInt("mapreduce.task.io.sort.mb", 20); 434 // Write a few files. 435 long hregionMaxFilesize = 10 * 1024; 436 conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); 437 438 Job job = new Job(conf, "testWritingPEData"); 439 setupRandomGeneratorMapper(job, false); 440 // This partitioner doesn't work well for number keys but using it anyways 441 // just to demonstrate how to configure it. 442 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 443 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT]; 444 445 Arrays.fill(startKey, (byte) 0); 446 Arrays.fill(endKey, (byte) 0xff); 447 448 job.setPartitionerClass(SimpleTotalOrderPartitioner.class); 449 // Set start and end rows for partitioner. 450 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey); 451 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey); 452 job.setReducerClass(CellSortReducer.class); 453 job.setOutputFormatClass(HFileOutputFormat2.class); 454 job.setNumReduceTasks(4); 455 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 456 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 457 CellSerialization.class.getName()); 458 459 FileOutputFormat.setOutputPath(job, testDir); 460 assertTrue(job.waitForCompletion(false)); 461 FileStatus[] files = fs.listStatus(testDir); 462 assertTrue(files.length > 0); 463 464 // check output file num and size. 465 for (byte[] family : FAMILIES) { 466 long kvCount = 0; 467 RemoteIterator<LocatedFileStatus> iterator = 468 fs.listFiles(testDir.suffix("/" + new String(family)), true); 469 while (iterator.hasNext()) { 470 LocatedFileStatus keyFileStatus = iterator.next(); 471 HFile.Reader reader = 472 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 473 HFileScanner scanner = reader.getScanner(conf, false, false, false); 474 475 kvCount += reader.getEntries(); 476 scanner.seekTo(); 477 long perKVSize = scanner.getCell().getSerializedSize(); 478 assertTrue("Data size of each file should not be too large.", 479 perKVSize * reader.getEntries() <= hregionMaxFilesize); 480 } 481 assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); 482 } 483 } 484 485 /** 486 * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile. 487 */ 488 @Test 489 public void test_WritingTagData() throws Exception { 490 Configuration conf = new Configuration(this.util.getConfiguration()); 491 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version"; 492 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS); 493 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 494 TaskAttemptContext context = null; 495 Path dir = util.getDataTestDir("WritingTagData"); 496 try { 497 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 498 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 499 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 500 Job job = new Job(conf); 501 FileOutputFormat.setOutputPath(job, dir); 502 context = createTestTaskAttemptContext(job); 503 HFileOutputFormat2 hof = new HFileOutputFormat2(); 504 writer = hof.getRecordWriter(context); 505 final byte[] b = Bytes.toBytes("b"); 506 507 List<Tag> tags = new ArrayList<>(); 508 tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670))); 509 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags); 510 writer.write(new ImmutableBytesWritable(), kv); 511 writer.close(context); 512 writer = null; 513 FileSystem fs = dir.getFileSystem(conf); 514 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 515 while (iterator.hasNext()) { 516 LocatedFileStatus keyFileStatus = iterator.next(); 517 HFile.Reader reader = 518 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 519 HFileScanner scanner = reader.getScanner(conf, false, false, false); 520 scanner.seekTo(); 521 ExtendedCell cell = scanner.getCell(); 522 List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell); 523 assertTrue(tagsFromCell.size() > 0); 524 for (Tag tag : tagsFromCell) { 525 assertTrue(tag.getType() == TagType.TTL_TAG_TYPE); 526 } 527 } 528 } finally { 529 if (writer != null && context != null) writer.close(context); 530 dir.getFileSystem(conf).delete(dir, true); 531 } 532 } 533 534 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 535 @Test 536 public void testJobConfiguration() throws Exception { 537 Configuration conf = new Configuration(this.util.getConfiguration()); 538 conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, 539 util.getDataTestDir("testJobConfiguration").toString()); 540 Job job = new Job(conf); 541 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); 542 Table table = Mockito.mock(Table.class); 543 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 544 setupMockStartKeys(regionLocator); 545 setupMockTableName(regionLocator); 546 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 547 assertEquals(job.getNumReduceTasks(), 4); 548 } 549 550 private byte[][] generateRandomStartKeys(int numKeys) { 551 Random random = ThreadLocalRandom.current(); 552 byte[][] ret = new byte[numKeys][]; 553 // first region start key is always empty 554 ret[0] = HConstants.EMPTY_BYTE_ARRAY; 555 for (int i = 1; i < numKeys; i++) { 556 ret[i] = generateData(random, DEFAULT_VALUE_LENGTH); 557 } 558 return ret; 559 } 560 561 /* 562 * This method takes some time and is done inline uploading data. For example, doing the mapfile 563 * test, generation of the key and value consumes about 30% of CPU time. 564 * @return Generated random value to insert into a table cell. 565 */ 566 public static byte[] generateData(final Random r, int length) { 567 byte[] b = new byte[length]; 568 int i; 569 570 for (i = 0; i < (length - 8); i += 8) { 571 b[i] = (byte) (65 + r.nextInt(26)); 572 b[i + 1] = b[i]; 573 b[i + 2] = b[i]; 574 b[i + 3] = b[i]; 575 b[i + 4] = b[i]; 576 b[i + 5] = b[i]; 577 b[i + 6] = b[i]; 578 b[i + 7] = b[i]; 579 } 580 581 byte a = (byte) (65 + r.nextInt(26)); 582 for (; i < length; i++) { 583 b[i] = a; 584 } 585 return b; 586 } 587 588 private byte[][] generateRandomSplitKeys(int numKeys) { 589 Random random = ThreadLocalRandom.current(); 590 byte[][] ret = new byte[numKeys][]; 591 for (int i = 0; i < numKeys; i++) { 592 ret[i] = generateData(random, DEFAULT_VALUE_LENGTH); 593 } 594 return ret; 595 } 596 597 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 598 @Test 599 public void testMRIncrementalLoad() throws Exception { 600 LOG.info("\nStarting test testMRIncrementalLoad\n"); 601 doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad"); 602 } 603 604 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 605 @Test 606 public void testMRIncrementalLoadWithSplit() throws Exception { 607 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n"); 608 doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit"); 609 } 610 611 /** 612 * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the 613 * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because 614 * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to 615 * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster 616 * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region 617 * locality features more easily. 618 */ 619 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 620 @Test 621 public void testMRIncrementalLoadWithLocality() throws Exception { 622 LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n"); 623 doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1"); 624 doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2"); 625 } 626 627 // @Ignore("Wahtevs") 628 @Test 629 public void testMRIncrementalLoadWithPutSortReducer() throws Exception { 630 LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n"); 631 doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer"); 632 } 633 634 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 635 boolean putSortReducer, String tableStr) throws Exception { 636 doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, 637 Arrays.asList(tableStr)); 638 } 639 640 @Test 641 public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception { 642 LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n"); 643 doIncrementalLoadTest(false, false, true, 644 Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList())); 645 } 646 647 private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality, 648 boolean putSortReducer, List<String> tableStr) throws Exception { 649 util = new HBaseTestingUtil(); 650 Configuration conf = util.getConfiguration(); 651 conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality); 652 int hostCount = 1; 653 int regionNum = 5; 654 if (shouldKeepLocality) { 655 // We should change host count higher than hdfs replica count when MiniHBaseCluster supports 656 // explicit hostnames parameter just like MiniDFSCluster does. 657 hostCount = 3; 658 regionNum = 20; 659 } 660 661 String[] hostnames = new String[hostCount]; 662 for (int i = 0; i < hostCount; ++i) { 663 hostnames[i] = "datanode_" + i; 664 } 665 StartTestingClusterOption option = StartTestingClusterOption.builder() 666 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 667 util.startMiniCluster(option); 668 669 Map<String, Table> allTables = new HashMap<>(tableStr.size()); 670 List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size()); 671 boolean writeMultipleTables = tableStr.size() > 1; 672 for (String tableStrSingle : tableStr) { 673 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 674 TableName tableName = TableName.valueOf(tableStrSingle); 675 Table table = util.createTable(tableName, FAMILIES, splitKeys); 676 677 RegionLocator r = util.getConnection().getRegionLocator(tableName); 678 assertEquals("Should start with empty table", 0, util.countRows(table)); 679 int numRegions = r.getStartKeys().length; 680 assertEquals("Should make " + regionNum + " regions", numRegions, regionNum); 681 682 allTables.put(tableStrSingle, table); 683 tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r)); 684 } 685 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 686 // Generate the bulk load files 687 runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer); 688 if (writeMultipleTables) { 689 testDir = new Path(testDir, "default"); 690 } 691 692 for (Table tableSingle : allTables.values()) { 693 // This doesn't write into the table, just makes files 694 assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle)); 695 } 696 int numTableDirs = 0; 697 FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir); 698 for (FileStatus tf : fss) { 699 Path tablePath = testDir; 700 if (writeMultipleTables) { 701 if (allTables.containsKey(tf.getPath().getName())) { 702 ++numTableDirs; 703 tablePath = tf.getPath(); 704 } else { 705 continue; 706 } 707 } 708 709 // Make sure that a directory was created for every CF 710 int dir = 0; 711 fss = tablePath.getFileSystem(conf).listStatus(tablePath); 712 for (FileStatus f : fss) { 713 for (byte[] family : FAMILIES) { 714 if (Bytes.toString(family).equals(f.getPath().getName())) { 715 ++dir; 716 } 717 } 718 } 719 assertEquals("Column family not found in FS.", FAMILIES.length, dir); 720 } 721 if (writeMultipleTables) { 722 assertEquals("Dir for all input tables not created", numTableDirs, allTables.size()); 723 } 724 725 Admin admin = util.getConnection().getAdmin(); 726 try { 727 // handle the split case 728 if (shouldChangeRegions) { 729 Table chosenTable = allTables.values().iterator().next(); 730 // Choose a semi-random table if multiple tables are available 731 LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString()); 732 admin.disableTable(chosenTable.getName()); 733 util.waitUntilNoRegionsInTransition(); 734 735 util.deleteTable(chosenTable.getName()); 736 byte[][] newSplitKeys = generateRandomSplitKeys(14); 737 Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys); 738 739 while ( 740 util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations() 741 .size() != 15 || !admin.isTableAvailable(table.getName()) 742 ) { 743 Thread.sleep(200); 744 LOG.info("Waiting for new region assignment to happen"); 745 } 746 } 747 748 // Perform the actual load 749 for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) { 750 Path tableDir = testDir; 751 String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString(); 752 LOG.info("Running BulkLoadHFiles on table" + tableNameStr); 753 if (writeMultipleTables) { 754 tableDir = new Path(testDir, tableNameStr); 755 } 756 Table currentTable = allTables.get(tableNameStr); 757 TableName currentTableName = currentTable.getName(); 758 BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir); 759 760 // Ensure data shows up 761 int expectedRows = 0; 762 if (putSortReducer) { 763 // no rows should be extracted 764 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 765 util.countRows(currentTable)); 766 } else { 767 expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 768 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 769 util.countRows(currentTable)); 770 Scan scan = new Scan(); 771 ResultScanner results = currentTable.getScanner(scan); 772 for (Result res : results) { 773 assertEquals(FAMILIES.length, res.rawCells().length); 774 Cell first = res.rawCells()[0]; 775 for (Cell kv : res.rawCells()) { 776 assertTrue(CellUtil.matchingRows(first, kv)); 777 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv))); 778 } 779 } 780 results.close(); 781 } 782 String tableDigestBefore = util.checksumRows(currentTable); 783 // Check region locality 784 HDFSBlocksDistribution hbd = new HDFSBlocksDistribution(); 785 for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) { 786 hbd.add(region.getHDFSBlocksDistribution()); 787 } 788 for (String hostname : hostnames) { 789 float locality = hbd.getBlockLocalityIndex(hostname); 790 LOG.info("locality of [" + hostname + "]: " + locality); 791 assertEquals(100, (int) (locality * 100)); 792 } 793 794 // Cause regions to reopen 795 admin.disableTable(currentTableName); 796 while (!admin.isTableDisabled(currentTableName)) { 797 Thread.sleep(200); 798 LOG.info("Waiting for table to disable"); 799 } 800 admin.enableTable(currentTableName); 801 util.waitTableAvailable(currentTableName); 802 assertEquals("Data should remain after reopening of regions", tableDigestBefore, 803 util.checksumRows(currentTable)); 804 } 805 } finally { 806 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 807 tableInfoSingle.getRegionLocator().close(); 808 } 809 for (Entry<String, Table> singleTable : allTables.entrySet()) { 810 singleTable.getValue().close(); 811 util.deleteTable(singleTable.getValue().getName()); 812 } 813 testDir.getFileSystem(conf).delete(testDir, true); 814 util.shutdownMiniCluster(); 815 } 816 } 817 818 private void runIncrementalPELoad(Configuration conf, 819 List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer) 820 throws IOException, InterruptedException, ClassNotFoundException { 821 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 822 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 823 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"), 824 MutationSerialization.class.getName(), ResultSerialization.class.getName(), 825 CellSerialization.class.getName()); 826 setupRandomGeneratorMapper(job, putSortReducer); 827 if (tableInfo.size() > 1) { 828 MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo); 829 int sum = 0; 830 for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) { 831 sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size(); 832 } 833 assertEquals(sum, job.getNumReduceTasks()); 834 } else { 835 RegionLocator regionLocator = tableInfo.get(0).getRegionLocator(); 836 HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(), 837 regionLocator); 838 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks()); 839 } 840 841 FileOutputFormat.setOutputPath(job, outDir); 842 843 assertFalse(util.getTestFileSystem().exists(outDir)); 844 845 assertTrue(job.waitForCompletion(true)); 846 } 847 848 /** 849 * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the 850 * family compression map is correctly serialized into and deserialized from configuration 851 */ 852 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 853 @Test 854 public void testSerializeDeserializeFamilyCompressionMap() throws IOException { 855 for (int numCfs = 0; numCfs <= 3; numCfs++) { 856 Configuration conf = new Configuration(this.util.getConfiguration()); 857 Map<String, Compression.Algorithm> familyToCompression = 858 getMockColumnFamiliesForCompression(numCfs); 859 Table table = Mockito.mock(Table.class); 860 setupMockColumnFamiliesForCompression(table, familyToCompression); 861 conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY, 862 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails, 863 Arrays.asList(table.getDescriptor()))); 864 865 // read back family specific compression setting from the configuration 866 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = 867 HFileOutputFormat2.createFamilyCompressionMap(conf); 868 869 // test that we have a value for all column families that matches with the 870 // used mock values 871 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) { 872 assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), 873 entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey()))); 874 } 875 } 876 } 877 878 private void setupMockColumnFamiliesForCompression(Table table, 879 Map<String, Compression.Algorithm> familyToCompression) throws IOException { 880 881 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 882 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) { 883 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 884 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 885 .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 886 887 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 888 } 889 Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor(); 890 } 891 892 /** 893 * @return a map from column family names to compression algorithms for testing column family 894 * compression. Column family names have special characters 895 */ 896 private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) { 897 Map<String, Compression.Algorithm> familyToCompression = new HashMap<>(); 898 // use column family names having special characters 899 if (numCfs-- > 0) { 900 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO); 901 } 902 if (numCfs-- > 0) { 903 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY); 904 } 905 if (numCfs-- > 0) { 906 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ); 907 } 908 if (numCfs-- > 0) { 909 familyToCompression.put("Family3", Compression.Algorithm.NONE); 910 } 911 return familyToCompression; 912 } 913 914 /** 915 * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the 916 * family bloom type map is correctly serialized into and deserialized from configuration 917 */ 918 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 919 @Test 920 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException { 921 for (int numCfs = 0; numCfs <= 2; numCfs++) { 922 Configuration conf = new Configuration(this.util.getConfiguration()); 923 Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs); 924 Table table = Mockito.mock(Table.class); 925 setupMockColumnFamiliesForBloomType(table, familyToBloomType); 926 conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY, 927 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails, 928 Arrays.asList(table.getDescriptor()))); 929 930 // read back family specific data block encoding settings from the 931 // configuration 932 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap = 933 HFileOutputFormat2.createFamilyBloomTypeMap(conf); 934 935 // test that we have a value for all column families that matches with the 936 // used mock values 937 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) { 938 assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(), 939 entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey()))); 940 } 941 } 942 } 943 944 private void setupMockColumnFamiliesForBloomType(Table table, 945 Map<String, BloomType> familyToDataBlockEncoding) throws IOException { 946 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 947 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) { 948 ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder 949 .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 950 .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 951 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 952 } 953 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 954 } 955 956 /** 957 * @return a map from column family names to compression algorithms for testing column family 958 * compression. Column family names have special characters 959 */ 960 private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) { 961 Map<String, BloomType> familyToBloomType = new HashMap<>(); 962 // use column family names having special characters 963 if (numCfs-- > 0) { 964 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW); 965 } 966 if (numCfs-- > 0) { 967 familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL); 968 } 969 if (numCfs-- > 0) { 970 familyToBloomType.put("Family3", BloomType.NONE); 971 } 972 return familyToBloomType; 973 } 974 975 /** 976 * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the 977 * family block size map is correctly serialized into and deserialized from configuration 978 */ 979 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 980 @Test 981 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException { 982 for (int numCfs = 0; numCfs <= 3; numCfs++) { 983 Configuration conf = new Configuration(this.util.getConfiguration()); 984 Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs); 985 Table table = Mockito.mock(Table.class); 986 setupMockColumnFamiliesForBlockSize(table, familyToBlockSize); 987 conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY, 988 HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails, 989 Arrays.asList(table.getDescriptor()))); 990 991 // read back family specific data block encoding settings from the 992 // configuration 993 Map<byte[], Integer> retrievedFamilyToBlockSizeMap = 994 HFileOutputFormat2.createFamilyBlockSizeMap(conf); 995 996 // test that we have a value for all column families that matches with the 997 // used mock values 998 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) { 999 assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(), 1000 entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey()))); 1001 } 1002 } 1003 } 1004 1005 private void setupMockColumnFamiliesForBlockSize(Table table, 1006 Map<String, Integer> familyToDataBlockEncoding) throws IOException { 1007 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1008 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) { 1009 ColumnFamilyDescriptor columnFamilyDescriptor = 1010 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 1011 .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build(); 1012 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 1013 } 1014 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 1015 } 1016 1017 /** 1018 * @return a map from column family names to compression algorithms for testing column family 1019 * compression. Column family names have special characters 1020 */ 1021 private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) { 1022 Map<String, Integer> familyToBlockSize = new HashMap<>(); 1023 // use column family names having special characters 1024 if (numCfs-- > 0) { 1025 familyToBlockSize.put("Family1!@#!@#&", 1234); 1026 } 1027 if (numCfs-- > 0) { 1028 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1029 } 1030 if (numCfs-- > 0) { 1031 familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE); 1032 } 1033 if (numCfs-- > 0) { 1034 familyToBlockSize.put("Family3", 0); 1035 } 1036 return familyToBlockSize; 1037 } 1038 1039 /** 1040 * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that 1041 * the family data block encoding map is correctly serialized into and deserialized from 1042 * configuration 1043 */ 1044 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1045 @Test 1046 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException { 1047 for (int numCfs = 0; numCfs <= 3; numCfs++) { 1048 Configuration conf = new Configuration(this.util.getConfiguration()); 1049 Map<String, DataBlockEncoding> familyToDataBlockEncoding = 1050 getMockColumnFamiliesForDataBlockEncoding(numCfs); 1051 Table table = Mockito.mock(Table.class); 1052 setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding); 1053 TableDescriptor tableDescriptor = table.getDescriptor(); 1054 conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY, 1055 HFileOutputFormat2.serializeColumnFamilyAttribute( 1056 HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor))); 1057 1058 // read back family specific data block encoding settings from the 1059 // configuration 1060 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap = 1061 HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf); 1062 1063 // test that we have a value for all column families that matches with the 1064 // used mock values 1065 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1066 assertEquals( 1067 "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(), 1068 entry.getValue(), 1069 retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey()))); 1070 } 1071 } 1072 } 1073 1074 private void setupMockColumnFamiliesForDataBlockEncoding(Table table, 1075 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException { 1076 TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1077 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) { 1078 ColumnFamilyDescriptor columnFamilyDescriptor = 1079 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1) 1080 .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0) 1081 .build(); 1082 mockTableDescriptor.setColumnFamily(columnFamilyDescriptor); 1083 } 1084 Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor(); 1085 } 1086 1087 /** 1088 * @return a map from column family names to compression algorithms for testing column family 1089 * compression. Column family names have special characters 1090 */ 1091 private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) { 1092 Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>(); 1093 // use column family names having special characters 1094 if (numCfs-- > 0) { 1095 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF); 1096 } 1097 if (numCfs-- > 0) { 1098 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF); 1099 } 1100 if (numCfs-- > 0) { 1101 familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX); 1102 } 1103 if (numCfs-- > 0) { 1104 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE); 1105 } 1106 return familyToDataBlockEncoding; 1107 } 1108 1109 private void setupMockStartKeys(RegionLocator table) throws IOException { 1110 byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), 1111 Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; 1112 Mockito.doReturn(mockKeys).when(table).getStartKeys(); 1113 } 1114 1115 private void setupMockTableName(RegionLocator table) throws IOException { 1116 TableName mockTableName = TableName.valueOf("mock_table"); 1117 Mockito.doReturn(mockTableName).when(table).getName(); 1118 } 1119 1120 /** 1121 * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings 1122 * from the column family descriptor 1123 */ 1124 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1125 @Test 1126 public void testColumnFamilySettings() throws Exception { 1127 Configuration conf = new Configuration(this.util.getConfiguration()); 1128 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1129 TaskAttemptContext context = null; 1130 Path dir = util.getDataTestDir("testColumnFamilySettings"); 1131 1132 // Setup table descriptor 1133 Table table = Mockito.mock(Table.class); 1134 RegionLocator regionLocator = Mockito.mock(RegionLocator.class); 1135 TableDescriptorBuilder tableDescriptorBuilder = 1136 TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]); 1137 1138 Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor(); 1139 for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) { 1140 tableDescriptorBuilder.setColumnFamily(hcd); 1141 } 1142 1143 // set up the table to return some mock keys 1144 setupMockStartKeys(regionLocator); 1145 1146 try { 1147 // partial map red setup to get an operational writer for testing 1148 // We turn off the sequence file compression, because DefaultCodec 1149 // pollutes the GZip codec pool with an incompatible compressor. 1150 conf.set("io.seqfile.compression.type", "NONE"); 1151 conf.set("hbase.fs.tmp.dir", dir.toString()); 1152 // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs 1153 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1154 1155 Job job = new Job(conf, "testLocalMRIncrementalLoad"); 1156 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); 1157 setupRandomGeneratorMapper(job, false); 1158 HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); 1159 FileOutputFormat.setOutputPath(job, dir); 1160 context = createTestTaskAttemptContext(job); 1161 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1162 writer = hof.getRecordWriter(context); 1163 1164 // write out random rows 1165 writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(), 1166 ROWSPERSPLIT); 1167 writer.close(context); 1168 1169 // Make sure that a directory was created for every CF 1170 FileSystem fs = dir.getFileSystem(conf); 1171 1172 // commit so that the filesystem has one directory per column family 1173 hof.getOutputCommitter(context).commitTask(context); 1174 hof.getOutputCommitter(context).commitJob(context); 1175 FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs)); 1176 assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length); 1177 for (FileStatus f : families) { 1178 String familyStr = f.getPath().getName(); 1179 ColumnFamilyDescriptor hcd = 1180 tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr)); 1181 // verify that the compression on this file matches the configured 1182 // compression 1183 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath(); 1184 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf); 1185 Map<byte[], byte[]> fileInfo = reader.getHFileInfo(); 1186 1187 byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY); 1188 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE"); 1189 assertEquals( 1190 "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")", 1191 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter))); 1192 assertEquals( 1193 "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")", 1194 hcd.getCompressionType(), reader.getFileContext().getCompression()); 1195 } 1196 } finally { 1197 dir.getFileSystem(conf).delete(dir, true); 1198 } 1199 } 1200 1201 /** 1202 * Write random values to the writer assuming a table created using {@link #FAMILIES} as column 1203 * family descriptors 1204 */ 1205 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer, 1206 TaskAttemptContext context, Set<byte[]> families, int numRows) 1207 throws IOException, InterruptedException { 1208 byte keyBytes[] = new byte[Bytes.SIZEOF_INT]; 1209 int valLength = 10; 1210 byte valBytes[] = new byte[valLength]; 1211 1212 int taskId = context.getTaskAttemptID().getTaskID().getId(); 1213 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!"; 1214 final byte[] qualifier = Bytes.toBytes("data"); 1215 for (int i = 0; i < numRows; i++) { 1216 Bytes.putInt(keyBytes, 0, i); 1217 Bytes.random(valBytes); 1218 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes); 1219 for (byte[] family : families) { 1220 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes); 1221 writer.write(key, kv); 1222 } 1223 } 1224 } 1225 1226 /** 1227 * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and 1228 * excluded from minor compaction. Without the fix of HBASE-6901, an 1229 * ArrayIndexOutOfBoundsException will be thrown. 1230 */ 1231 @Ignore("Flakey: See HBASE-9051") 1232 @Test 1233 public void testExcludeAllFromMinorCompaction() throws Exception { 1234 Configuration conf = util.getConfiguration(); 1235 conf.setInt("hbase.hstore.compaction.min", 2); 1236 generateRandomStartKeys(5); 1237 1238 util.startMiniCluster(); 1239 try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin(); 1240 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1241 RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) { 1242 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1243 assertEquals("Should start with empty table", 0, util.countRows(table)); 1244 1245 // deep inspection: get the StoreFile dir 1246 final Path storePath = 1247 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1248 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1249 Bytes.toString(FAMILIES[0]))); 1250 assertEquals(0, fs.listStatus(storePath).length); 1251 1252 // Generate two bulk load files 1253 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1254 1255 for (int i = 0; i < 2; i++) { 1256 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i); 1257 runIncrementalPELoad(conf, 1258 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), 1259 conn.getRegionLocator(TABLE_NAMES[0]))), 1260 testDir, false); 1261 // Perform the actual load 1262 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1263 } 1264 1265 // Ensure data shows up 1266 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1267 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows, 1268 util.countRows(table)); 1269 1270 // should have a second StoreFile now 1271 assertEquals(2, fs.listStatus(storePath).length); 1272 1273 // minor compactions shouldn't get rid of the file 1274 admin.compact(TABLE_NAMES[0]); 1275 try { 1276 quickPoll(new Callable<Boolean>() { 1277 @Override 1278 public Boolean call() throws Exception { 1279 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1280 for (HRegion region : regions) { 1281 for (HStore store : region.getStores()) { 1282 store.closeAndArchiveCompactedFiles(); 1283 } 1284 } 1285 return fs.listStatus(storePath).length == 1; 1286 } 1287 }, 5000); 1288 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1289 } catch (AssertionError ae) { 1290 // this is expected behavior 1291 } 1292 1293 // a major compaction should work though 1294 admin.majorCompact(TABLE_NAMES[0]); 1295 quickPoll(new Callable<Boolean>() { 1296 @Override 1297 public Boolean call() throws Exception { 1298 List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]); 1299 for (HRegion region : regions) { 1300 for (HStore store : region.getStores()) { 1301 store.closeAndArchiveCompactedFiles(); 1302 } 1303 } 1304 return fs.listStatus(storePath).length == 1; 1305 } 1306 }, 5000); 1307 1308 } finally { 1309 util.shutdownMiniCluster(); 1310 } 1311 } 1312 1313 @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") 1314 @Test 1315 public void testExcludeMinorCompaction() throws Exception { 1316 Configuration conf = util.getConfiguration(); 1317 conf.setInt("hbase.hstore.compaction.min", 2); 1318 generateRandomStartKeys(5); 1319 1320 util.startMiniCluster(); 1321 try (Connection conn = ConnectionFactory.createConnection(conf); 1322 Admin admin = conn.getAdmin()) { 1323 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction"); 1324 final FileSystem fs = util.getDFSCluster().getFileSystem(); 1325 Table table = util.createTable(TABLE_NAMES[0], FAMILIES); 1326 assertEquals("Should start with empty table", 0, util.countRows(table)); 1327 1328 // deep inspection: get the StoreFile dir 1329 final Path storePath = 1330 new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]), 1331 new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(), 1332 Bytes.toString(FAMILIES[0]))); 1333 assertEquals(0, fs.listStatus(storePath).length); 1334 1335 // put some data in it and flush to create a storefile 1336 Put p = new Put(Bytes.toBytes("test")); 1337 p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1")); 1338 table.put(p); 1339 admin.flush(TABLE_NAMES[0]); 1340 assertEquals(1, util.countRows(table)); 1341 quickPoll(new Callable<Boolean>() { 1342 @Override 1343 public Boolean call() throws Exception { 1344 return fs.listStatus(storePath).length == 1; 1345 } 1346 }, 5000); 1347 1348 // Generate a bulk load file with more rows 1349 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true); 1350 1351 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]); 1352 runIncrementalPELoad(conf, 1353 Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)), 1354 testDir, false); 1355 1356 // Perform the actual load 1357 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir); 1358 1359 // Ensure data shows up 1360 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT; 1361 assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1, 1362 util.countRows(table)); 1363 1364 // should have a second StoreFile now 1365 assertEquals(2, fs.listStatus(storePath).length); 1366 1367 // minor compactions shouldn't get rid of the file 1368 admin.compact(TABLE_NAMES[0]); 1369 try { 1370 quickPoll(new Callable<Boolean>() { 1371 @Override 1372 public Boolean call() throws Exception { 1373 return fs.listStatus(storePath).length == 1; 1374 } 1375 }, 5000); 1376 throw new IOException("SF# = " + fs.listStatus(storePath).length); 1377 } catch (AssertionError ae) { 1378 // this is expected behavior 1379 } 1380 1381 // a major compaction should work though 1382 admin.majorCompact(TABLE_NAMES[0]); 1383 quickPoll(new Callable<Boolean>() { 1384 @Override 1385 public Boolean call() throws Exception { 1386 return fs.listStatus(storePath).length == 1; 1387 } 1388 }, 5000); 1389 1390 } finally { 1391 util.shutdownMiniCluster(); 1392 } 1393 } 1394 1395 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception { 1396 int sleepMs = 10; 1397 int retries = (int) Math.ceil(((double) waitMs) / sleepMs); 1398 while (retries-- > 0) { 1399 if (c.call().booleanValue()) { 1400 return; 1401 } 1402 Thread.sleep(sleepMs); 1403 } 1404 fail(); 1405 } 1406 1407 public static void main(String args[]) throws Exception { 1408 new TestHFileOutputFormat2().manualTest(args); 1409 } 1410 1411 public void manualTest(String args[]) throws Exception { 1412 Configuration conf = HBaseConfiguration.create(); 1413 util = new HBaseTestingUtil(conf); 1414 if ("newtable".equals(args[0])) { 1415 TableName tname = TableName.valueOf(args[1]); 1416 byte[][] splitKeys = generateRandomSplitKeys(4); 1417 Table table = util.createTable(tname, FAMILIES, splitKeys); 1418 } else if ("incremental".equals(args[0])) { 1419 TableName tname = TableName.valueOf(args[1]); 1420 try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin(); 1421 RegionLocator regionLocator = c.getRegionLocator(tname)) { 1422 Path outDir = new Path("incremental-out"); 1423 runIncrementalPELoad(conf, 1424 Arrays 1425 .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)), 1426 outDir, false); 1427 } 1428 } else { 1429 throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental"); 1430 } 1431 } 1432 1433 @Test 1434 public void testBlockStoragePolicy() throws Exception { 1435 util = new HBaseTestingUtil(); 1436 Configuration conf = util.getConfiguration(); 1437 conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD"); 1438 1439 conf.set( 1440 HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes 1441 .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])), 1442 "ONE_SSD"); 1443 Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0])); 1444 Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1])); 1445 util.startMiniDFSCluster(3); 1446 FileSystem fs = util.getDFSCluster().getFileSystem(); 1447 try { 1448 fs.mkdirs(cf1Dir); 1449 fs.mkdirs(cf2Dir); 1450 1451 // the original block storage policy would be HOT 1452 String spA = getStoragePolicyName(fs, cf1Dir); 1453 String spB = getStoragePolicyName(fs, cf2Dir); 1454 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1455 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1456 assertEquals("HOT", spA); 1457 assertEquals("HOT", spB); 1458 1459 // alter table cf schema to change storage policies 1460 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1461 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir); 1462 HFileOutputFormat2.configureStoragePolicy(conf, fs, 1463 HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir); 1464 spA = getStoragePolicyName(fs, cf1Dir); 1465 spB = getStoragePolicyName(fs, cf2Dir); 1466 LOG.debug("Storage policy of cf 0: [" + spA + "]."); 1467 LOG.debug("Storage policy of cf 1: [" + spB + "]."); 1468 assertNotNull(spA); 1469 assertEquals("ONE_SSD", spA); 1470 assertNotNull(spB); 1471 assertEquals("ALL_SSD", spB); 1472 } finally { 1473 fs.delete(cf1Dir, true); 1474 fs.delete(cf2Dir, true); 1475 util.shutdownMiniDFSCluster(); 1476 } 1477 } 1478 1479 private String getStoragePolicyName(FileSystem fs, Path path) { 1480 try { 1481 Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path); 1482 return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName"); 1483 } catch (Exception e) { 1484 // Maybe fail because of using old HDFS version, try the old way 1485 if (LOG.isTraceEnabled()) { 1486 LOG.trace("Failed to get policy directly", e); 1487 } 1488 String policy = getStoragePolicyNameForOldHDFSVersion(fs, path); 1489 return policy == null ? "HOT" : policy;// HOT by default 1490 } 1491 } 1492 1493 private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) { 1494 try { 1495 if (fs instanceof DistributedFileSystem) { 1496 DistributedFileSystem dfs = (DistributedFileSystem) fs; 1497 HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath()); 1498 if (null != status) { 1499 byte storagePolicyId = status.getStoragePolicy(); 1500 Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED"); 1501 if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) { 1502 BlockStoragePolicy[] policies = dfs.getStoragePolicies(); 1503 for (BlockStoragePolicy policy : policies) { 1504 if (policy.getId() == storagePolicyId) { 1505 return policy.getName(); 1506 } 1507 } 1508 } 1509 } 1510 } 1511 } catch (Throwable e) { 1512 LOG.warn("failed to get block storage policy of [" + path + "]", e); 1513 } 1514 1515 return null; 1516 } 1517 1518 @Test 1519 public void testConfigurePartitioner() throws Exception { 1520 util.startMiniDFSCluster(1); 1521 try { 1522 Configuration conf = util.getConfiguration(); 1523 // Create a user who is not the current user 1524 String fooUserName = "foo1234"; 1525 String fooGroupName = "group1"; 1526 UserGroupInformation ugi = 1527 UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName }); 1528 // Get user's home directory 1529 Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() { 1530 @Override 1531 public Path run() { 1532 try (FileSystem fs = FileSystem.get(conf)) { 1533 return fs.makeQualified(fs.getHomeDirectory()); 1534 } catch (IOException ioe) { 1535 LOG.error("Failed to get foo's home directory", ioe); 1536 } 1537 return null; 1538 } 1539 }); 1540 // create the home directory and chown 1541 FileSystem fs = FileSystem.get(conf); 1542 fs.mkdirs(fooHomeDirectory); 1543 fs.setOwner(fooHomeDirectory, fooUserName, fooGroupName); 1544 1545 Job job = Mockito.mock(Job.class); 1546 Mockito.doReturn(conf).when(job).getConfiguration(); 1547 ImmutableBytesWritable writable = new ImmutableBytesWritable(); 1548 List<ImmutableBytesWritable> splitPoints = new ArrayList<ImmutableBytesWritable>(); 1549 splitPoints.add(writable); 1550 1551 ugi.doAs(new PrivilegedAction<Void>() { 1552 @Override 1553 public Void run() { 1554 try { 1555 HFileOutputFormat2.configurePartitioner(job, splitPoints, false); 1556 } catch (IOException ioe) { 1557 LOG.error("Failed to configure partitioner", ioe); 1558 } 1559 return null; 1560 } 1561 }); 1562 // verify that the job uses TotalOrderPartitioner 1563 verify(job).setPartitionerClass(TotalOrderPartitioner.class); 1564 // verify that TotalOrderPartitioner.setPartitionFile() is called. 1565 String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path"); 1566 Assert.assertNotNull(partitionPathString); 1567 // Make sure the partion file is in foo1234's home directory, and that 1568 // the file exists. 1569 Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString())); 1570 Assert.assertTrue(fs.exists(new Path(partitionPathString))); 1571 } finally { 1572 util.shutdownMiniDFSCluster(); 1573 } 1574 } 1575 1576 @Test 1577 public void TestConfigureCompression() throws Exception { 1578 Configuration conf = new Configuration(this.util.getConfiguration()); 1579 RecordWriter<ImmutableBytesWritable, Cell> writer = null; 1580 TaskAttemptContext context = null; 1581 Path dir = util.getDataTestDir("TestConfigureCompression"); 1582 String hfileoutputformatCompression = "gz"; 1583 1584 try { 1585 conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString()); 1586 conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false); 1587 1588 conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression); 1589 1590 Job job = Job.getInstance(conf); 1591 FileOutputFormat.setOutputPath(job, dir); 1592 context = createTestTaskAttemptContext(job); 1593 HFileOutputFormat2 hof = new HFileOutputFormat2(); 1594 writer = hof.getRecordWriter(context); 1595 final byte[] b = Bytes.toBytes("b"); 1596 1597 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b); 1598 writer.write(new ImmutableBytesWritable(), kv); 1599 writer.close(context); 1600 writer = null; 1601 FileSystem fs = dir.getFileSystem(conf); 1602 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true); 1603 while (iterator.hasNext()) { 1604 LocatedFileStatus keyFileStatus = iterator.next(); 1605 HFile.Reader reader = 1606 HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); 1607 assertEquals(reader.getTrailer().getCompressionCodec().getName(), 1608 hfileoutputformatCompression); 1609 } 1610 } finally { 1611 if (writer != null && context != null) { 1612 writer.close(context); 1613 } 1614 dir.getFileSystem(conf).delete(dir, true); 1615 } 1616 1617 } 1618 1619 @Test 1620 public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception { 1621 // Start cluster A 1622 util = new HBaseTestingUtil(); 1623 Configuration confA = util.getConfiguration(); 1624 int hostCount = 3; 1625 int regionNum = 20; 1626 String[] hostnames = new String[hostCount]; 1627 for (int i = 0; i < hostCount; ++i) { 1628 hostnames[i] = "datanode_" + i; 1629 } 1630 StartTestingClusterOption option = StartTestingClusterOption.builder() 1631 .numRegionServers(hostCount).dataNodeHosts(hostnames).build(); 1632 util.startMiniCluster(option); 1633 1634 // Start cluster B 1635 HBaseTestingUtil utilB = new HBaseTestingUtil(); 1636 Configuration confB = utilB.getConfiguration(); 1637 utilB.startMiniCluster(option); 1638 1639 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad"); 1640 1641 byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1); 1642 TableName tableName = TableName.valueOf("table"); 1643 // Create table in cluster B 1644 try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys); 1645 RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) { 1646 // Generate the bulk load files 1647 // Job has zookeeper configuration for cluster A 1648 // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B 1649 Job job = new Job(confA, "testLocalMRIncrementalLoad"); 1650 Configuration jobConf = job.getConfiguration(); 1651 final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf); 1652 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad")); 1653 setupRandomGeneratorMapper(job, false); 1654 HFileOutputFormat2.configureIncrementalLoad(job, table, r); 1655 1656 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1657 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY)); 1658 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1659 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY)); 1660 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1661 jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY)); 1662 1663 String bSpecificConfigKey = "my.override.config.for.b"; 1664 String bSpecificConfigValue = "b-specific-value"; 1665 jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey, 1666 bSpecificConfigValue); 1667 1668 FileOutputFormat.setOutputPath(job, testDir); 1669 1670 assertFalse(util.getTestFileSystem().exists(testDir)); 1671 1672 assertTrue(job.waitForCompletion(true)); 1673 1674 final List<Configuration> configs = 1675 ConfigurationCaptorConnection.getCapturedConfigarutions(key); 1676 1677 assertFalse(configs.isEmpty()); 1678 for (Configuration config : configs) { 1679 assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM), 1680 config.get(HConstants.ZOOKEEPER_QUORUM)); 1681 assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT), 1682 config.get(HConstants.ZOOKEEPER_CLIENT_PORT)); 1683 assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT), 1684 config.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 1685 1686 assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey)); 1687 } 1688 } finally { 1689 utilB.deleteTable(tableName); 1690 testDir.getFileSystem(confA).delete(testDir, true); 1691 util.shutdownMiniCluster(); 1692 utilB.shutdownMiniCluster(); 1693 } 1694 } 1695 1696 private static class ConfigurationCaptorConnection implements Connection { 1697 private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid"; 1698 1699 private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>(); 1700 1701 private final Connection delegate; 1702 1703 public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user, 1704 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 1705 // here we do not use this registry, so close it... 1706 registry.close(); 1707 // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection 1708 // implementation in below method 1709 delegate = 1710 FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes)) 1711 .toConnection(); 1712 1713 final String uuid = conf.get(UUID_KEY); 1714 if (uuid != null) { 1715 confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf); 1716 } 1717 } 1718 1719 static UUID configureConnectionImpl(Configuration conf) { 1720 conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 1721 ConfigurationCaptorConnection.class, Connection.class); 1722 1723 final UUID uuid = UUID.randomUUID(); 1724 conf.set(UUID_KEY, uuid.toString()); 1725 return uuid; 1726 } 1727 1728 static List<Configuration> getCapturedConfigarutions(UUID key) { 1729 return confs.get(key); 1730 } 1731 1732 @Override 1733 public Configuration getConfiguration() { 1734 return delegate.getConfiguration(); 1735 } 1736 1737 @Override 1738 public Table getTable(TableName tableName) throws IOException { 1739 return delegate.getTable(tableName); 1740 } 1741 1742 @Override 1743 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 1744 return delegate.getTable(tableName, pool); 1745 } 1746 1747 @Override 1748 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 1749 return delegate.getBufferedMutator(tableName); 1750 } 1751 1752 @Override 1753 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 1754 return delegate.getBufferedMutator(params); 1755 } 1756 1757 @Override 1758 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 1759 return delegate.getRegionLocator(tableName); 1760 } 1761 1762 @Override 1763 public void clearRegionLocationCache() { 1764 delegate.clearRegionLocationCache(); 1765 } 1766 1767 @Override 1768 public Admin getAdmin() throws IOException { 1769 return delegate.getAdmin(); 1770 } 1771 1772 @Override 1773 public void close() throws IOException { 1774 delegate.close(); 1775 } 1776 1777 @Override 1778 public boolean isClosed() { 1779 return delegate.isClosed(); 1780 } 1781 1782 @Override 1783 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 1784 return delegate.getTableBuilder(tableName, pool); 1785 } 1786 1787 @Override 1788 public AsyncConnection toAsyncConnection() { 1789 return delegate.toAsyncConnection(); 1790 } 1791 1792 @Override 1793 public String getClusterId() { 1794 return delegate.getClusterId(); 1795 } 1796 1797 @Override 1798 public Hbck getHbck() throws IOException { 1799 return delegate.getHbck(); 1800 } 1801 1802 @Override 1803 public Hbck getHbck(ServerName masterServer) throws IOException { 1804 return delegate.getHbck(masterServer); 1805 } 1806 1807 @Override 1808 public void abort(String why, Throwable e) { 1809 delegate.abort(why, e); 1810 } 1811 1812 @Override 1813 public boolean isAborted() { 1814 return delegate.isAborted(); 1815 } 1816 } 1817 1818}