Skip to content

Commit cb6011e

Browse files
committed
Merge pull request pinpoint-apm#1363 from Xylus/feature/issue-1362
Add parallel result scanner
2 parents 8f8bd82 + 67862a0 commit cb6011e

File tree

12 files changed

+588
-192
lines changed

12 files changed

+588
-192
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2015 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.common.hbase;
18+
19+
import com.navercorp.pinpoint.common.util.StopWatch;
20+
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
21+
import org.apache.hadoop.hbase.client.HTableInterface;
22+
import org.apache.hadoop.hbase.client.ResultScanner;
23+
import org.apache.hadoop.hbase.client.Scan;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import org.springframework.data.hadoop.hbase.ResultsExtractor;
27+
import org.springframework.data.hadoop.hbase.TableCallback;
28+
29+
import java.io.IOException;
30+
31+
/**
32+
* @author HyunGil Jeong
33+
*/
34+
public abstract class AbstractDistributedScannerTableCallback<T> implements TableCallback<T> {
35+
36+
protected final Logger logger = LoggerFactory.getLogger(getClass());
37+
protected final boolean debugEnabled = this.logger.isDebugEnabled();
38+
39+
protected final Scan scan;
40+
protected final AbstractRowKeyDistributor rowKeyDistributor;
41+
protected final ResultsExtractor<T> resultsExtractor;
42+
43+
protected AbstractDistributedScannerTableCallback(Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor<T> resultsExtractor) {
44+
this.scan = scan;
45+
this.rowKeyDistributor = rowKeyDistributor;
46+
this.resultsExtractor = resultsExtractor;
47+
}
48+
49+
@Override
50+
public T doInTable(HTableInterface table) throws Throwable {
51+
StopWatch watch = null;
52+
if (debugEnabled) {
53+
watch = new StopWatch();
54+
watch.start();
55+
}
56+
final ResultScanner[] splitScanners = splitScanners(table);
57+
final ResultScanner scanner = createResultScanner(splitScanners);
58+
if (debugEnabled) {
59+
logger.debug("DistributedScanner createTime:{}", watch.stop());
60+
watch.start();
61+
}
62+
try {
63+
return resultsExtractor.extractData(scanner);
64+
} finally {
65+
scanner.close();
66+
if (debugEnabled) {
67+
logger.debug("DistributedScanner scanTime:{}", watch.stop());
68+
}
69+
}
70+
}
71+
72+
protected abstract ResultScanner createResultScanner(ResultScanner[] scanners) throws IOException;
73+
74+
private ResultScanner[] splitScanners(HTableInterface htable) throws IOException {
75+
Scan[] scans = this.rowKeyDistributor.getDistributedScans(this.scan);
76+
final int length = scans.length;
77+
for(int i = 0; i < length; i++) {
78+
Scan scan = scans[i];
79+
// other properties are already set upon construction
80+
scan.setId(this.scan.getId() + "-" + i);
81+
}
82+
83+
ResultScanner[] scanners = new ResultScanner[length];
84+
boolean success = false;
85+
try {
86+
for (int i = 0; i < length; i++) {
87+
scanners[i] = htable.getScanner(scans[i]);
88+
}
89+
success = true;
90+
} finally {
91+
if (!success) {
92+
closeScanner(scanners);
93+
}
94+
}
95+
return scanners;
96+
}
97+
98+
private void closeScanner(ResultScanner[] scannerList ) {
99+
for (ResultScanner scanner : scannerList) {
100+
if (scanner != null) {
101+
try {
102+
scanner.close();
103+
} catch (Exception e) {
104+
logger.warn("Scanner.close() error Caused:{}", e.getMessage(), e);
105+
}
106+
}
107+
}
108+
}
109+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2015 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.common.hbase;
18+
19+
import com.sematext.hbase.wd.AbstractRowKeyDistributor;
20+
import com.sematext.hbase.wd.DistributedScanner;
21+
import org.apache.hadoop.hbase.client.ResultScanner;
22+
import org.apache.hadoop.hbase.client.Scan;
23+
import org.springframework.data.hadoop.hbase.ResultsExtractor;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* @author HyunGil Jeong
29+
*/
30+
public class DistributedScannerTableCallback<T> extends AbstractDistributedScannerTableCallback<T> {
31+
32+
protected DistributedScannerTableCallback(Scan scan, AbstractRowKeyDistributor rowKeyDistributor, ResultsExtractor<T> resultsExtractor) {
33+
super(scan, rowKeyDistributor, resultsExtractor);
34+
}
35+
36+
@Override
37+
protected ResultScanner createResultScanner(ResultScanner[] scanners) throws IOException {
38+
return new DistributedScanner(super.rowKeyDistributor, scanners);
39+
}
40+
41+
}

‎commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseOperations2.java‎

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,26 @@ public interface HbaseOperations2 extends HbaseOperations {
8686

8787
<T> List<List<T>> find(String tableName, final List<Scan> scans, final RowMapper<T> action);
8888

89-
<T> List<T> find(String tableName, final Scan scan, AbstractRowKeyDistributor rowKeyDistributor, final RowMapper<T> action);
89+
// Distributed scanners
9090

91-
<T> List<T> find(String tableName, final Scan scan, AbstractRowKeyDistributor rowKeyDistributor, int limit, final RowMapper<T> action);
91+
<T> List<T> find(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final RowMapper<T> action);
92+
93+
<T> List<T> find(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, int limit, final RowMapper<T> action);
9294

9395
<T> List<T> find(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, int limit, final RowMapper<T> action, final LimitEventHandler limitEventHandler);
94-
96+
9597
<T> T find(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor<T> action);
9698

99+
// Parallel scanners for distributed scans
100+
101+
<T> List<T> findParallel(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final RowMapper<T> action, int numParallelThreads);
102+
103+
<T> List<T> findParallel(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, int limit, final RowMapper<T> action, int numParallelThreads);
104+
105+
<T> List<T> findParallel(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, int limit, final RowMapper<T> action, final LimitEventHandler limitEventHandler, int numParallelThreads);
106+
107+
<T> T findParallel(String tableName, final Scan scan, final AbstractRowKeyDistributor rowKeyDistributor, final ResultsExtractor<T> action, int numParallelThreads);
108+
97109
Result increment(String tableName, final Increment increment);
98110

99111
/**

0 commit comments

Comments
 (0)