|
@@ -36,8 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.locks.Lock;
|
|
|
-import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
import static org.apache.dolphinscheduler.common.Constants.COMMA;
|
|
|
|
|
@@ -70,11 +68,6 @@ public class LowerWeightHostManager extends CommonHostManager {
|
|
|
*/
|
|
|
private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights;
|
|
|
|
|
|
-
|
|
|
- * worker group host lock
|
|
|
- */
|
|
|
- private Lock lock;
|
|
|
-
|
|
|
|
|
|
* executor service
|
|
|
*/
|
|
@@ -84,7 +77,6 @@ public class LowerWeightHostManager extends CommonHostManager {
|
|
|
public void init(){
|
|
|
this.selector = new LowerWeightRoundRobin();
|
|
|
this.workerHostWeights = new ConcurrentHashMap<>();
|
|
|
- this.lock = new ReentrantLock();
|
|
|
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
|
|
|
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
|
|
|
this.roundRobinHostManager = new RoundRobinHostManager();
|
|
@@ -116,23 +108,13 @@ public class LowerWeightHostManager extends CommonHostManager {
|
|
|
throw new UnsupportedOperationException("not support");
|
|
|
}
|
|
|
|
|
|
- private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
|
|
|
- lock.lock();
|
|
|
- try {
|
|
|
- workerHostWeights.clear();
|
|
|
- workerHostWeights.putAll(workerHostWeights);
|
|
|
- } finally {
|
|
|
- lock.unlock();
|
|
|
- }
|
|
|
+ private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
|
|
|
+ this.workerHostWeights.clear();
|
|
|
+ this.workerHostWeights.putAll(workerHostWeights);
|
|
|
}
|
|
|
|
|
|
- private Set<HostWeight> getWorkerHostWeights(String workerGroup){
|
|
|
- lock.lock();
|
|
|
- try {
|
|
|
- return workerHostWeights.get(workerGroup);
|
|
|
- } finally {
|
|
|
- lock.unlock();
|
|
|
- }
|
|
|
+ private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
|
|
|
+ return workerHostWeights.get(workerGroup);
|
|
|
}
|
|
|
|
|
|
class RefreshResourceTask implements Runnable{
|