chore(project): 添加项目配置文件和忽略规则

- 添加 Babel 配置文件支持 ES6+ 语法转换
- 添加 ESLint 忽略规则和配置文件
- 添加 Git 忽略规则文件
- 添加 Travis CI 配置文件
- 添加 1.4.2 版本变更日志文件
- 添加 Helm 图表辅助模板文件
- 添加 Helm 忽略规则文件
This commit is contained in:
2026-03-27 17:36:48 +08:00
commit c2453d6434
1703 changed files with 277582 additions and 0 deletions

42
discovery/pom.xml Normal file
View File

@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<artifactId>seata-discovery</artifactId>
<name>seata-discovery ${project.version}</name>
<modules>
<module>seata-discovery-consul</module>
<module>seata-discovery-core</module>
<module>seata-discovery-custom</module>
<module>seata-discovery-all</module>
<module>seata-discovery-eureka</module>
<module>seata-discovery-zk</module>
<module>seata-discovery-redis</module>
<module>seata-discovery-nacos</module>
<module>seata-discovery-etcd3</module>
<module>seata-discovery-sofa</module>
</modules>
</project>

View File

@@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-all</artifactId>
<name>seata-discovery-all ${project.version}</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-consul</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-custom</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-eureka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-zk</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-nacos</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-etcd3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-sofa</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-consul</artifactId>
<name>seata-discovery-consul ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.ecwid.consul</groupId>
<artifactId>consul-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,32 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.consul;
import com.ecwid.consul.v1.health.model.HealthService;
import java.util.List;
/**
* @author xingfudeshi@gmail.com
*/
public interface ConsulListener {
/**
* on event
*
* @param services
*/
void onEvent(List<HealthService> services);
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.consul;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryProvider;
import io.seata.discovery.registry.RegistryService;
/**
* @author xingfudeshi@gmail.com
*/
@LoadLevel(name = "Consul", order = 1)
public class ConsulRegistryProvider implements RegistryProvider {
@Override
public RegistryService provide() {
return ConsulRegistryServiceImpl.getInstance();
}
}

View File

@@ -0,0 +1,348 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.consul;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.config.ConfigurationKeys;
import io.seata.discovery.registry.RegistryService;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author xingfudeshi@gmail.com
*/
public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener> {
private static volatile ConsulRegistryServiceImpl instance;
private static volatile ConsulClient client;
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRegistryServiceImpl.class);
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static final String FILE_ROOT_REGISTRY = "registry";
private static final String FILE_CONFIG_SPLIT_CHAR = ".";
private static final String REGISTRY_TYPE = "consul";
private static final String SERVER_ADDR_KEY = "serverAddr";
private static final String REGISTRY_CLUSTER = "cluster";
private static final String DEFAULT_CLUSTER_NAME = "default";
private static final String SERVICE_TAG = "services";
private static final String ACL_TOKEN = "aclToken";
private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE + FILE_CONFIG_SPLIT_CHAR;
private ConcurrentMap<String, List<InetSocketAddress>> clusterAddressMap;
private ConcurrentMap<String, Set<ConsulListener>> listenerMap;
private ExecutorService notifierExecutor;
private ConcurrentMap<String, ConsulNotifier> notifiers;
private static final int THREAD_POOL_NUM = 1;
private static final int MAP_INITIAL_CAPACITY = 8;
/**
* default tcp check interval
*/
private static final String DEFAULT_CHECK_INTERVAL = "10s";
/**
* default tcp check timeout
*/
private static final String DEFAULT_CHECK_TIMEOUT = "1s";
/**
* default deregister critical server after
*/
private static final String DEFAULT_DEREGISTER_TIME = "20s";
/**
* default watch timeout in second
*/
private static final int DEFAULT_WATCH_TIMEOUT = 60;
private ConsulRegistryServiceImpl() {
//initial the capacity with 8
clusterAddressMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
listenerMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
notifiers = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
notifierExecutor = new ThreadPoolExecutor(THREAD_POOL_NUM, THREAD_POOL_NUM,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new NamedThreadFactory("services-consul-notifier", THREAD_POOL_NUM));
}
/**
* get instance of ConsulRegistryServiceImpl
*
* @return instance
*/
static ConsulRegistryServiceImpl getInstance() {
if (instance == null) {
synchronized (ConsulRegistryServiceImpl.class) {
if (instance == null) {
instance = new ConsulRegistryServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
getConsulClient().agentServiceRegister(createService(address), getAclToken());
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
getConsulClient().agentServiceDeregister(createServiceId(address), getAclToken());
}
@Override
public void subscribe(String cluster, ConsulListener listener) throws Exception {
//1.add listener to subscribe list
listenerMap.computeIfAbsent(cluster, key -> new HashSet<>())
.add(listener);
//2.get healthy services
Response<List<HealthService>> response = getHealthyServices(cluster, -1, DEFAULT_WATCH_TIMEOUT);
//3.get current consul index.
Long index = response.getConsulIndex();
ConsulNotifier notifier = notifiers.computeIfAbsent(cluster, key -> new ConsulNotifier(cluster, index));
//4.run notifier
notifierExecutor.submit(notifier);
}
@Override
public void unsubscribe(String cluster, ConsulListener listener) throws Exception {
//1.remove notifier for the cluster
ConsulNotifier notifier = notifiers.remove(cluster);
//2.stop the notifier
notifier.stop();
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
final String cluster = getServiceGroup(key);
if (cluster == null) {
return null;
}
if (!listenerMap.containsKey(cluster)) {
//1.refresh cluster
refreshCluster(cluster);
//2. subscribe
subscribe(cluster, services -> refreshCluster(cluster, services));
}
return clusterAddressMap.get(cluster);
}
/**
* get consul client
*
* @return client
*/
private ConsulClient getConsulClient() {
if (client == null) {
synchronized (ConsulRegistryServiceImpl.class) {
if (client == null) {
String serverAddr = FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY);
InetSocketAddress inetSocketAddress = NetUtil.toInetSocketAddress(serverAddr);
client = new ConsulClient(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
}
}
}
return client;
}
/**
* get cluster name
*
* @return
*/
private String getClusterName() {
String clusterConfigName = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER);
return FILE_CONFIG.getConfig(clusterConfigName, DEFAULT_CLUSTER_NAME);
}
/**
* create serviceId
*
* @param address
* @return serviceId
*/
private String createServiceId(InetSocketAddress address) {
return getClusterName() + "-" + NetUtil.toStringAddress(address);
}
/**
* get consul acl-token
*
* @return acl-token
*/
private static String getAclToken() {
String fileConfigKey = String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, ACL_TOKEN);
String aclToken = StringUtils.isNotBlank(System.getProperty(ACL_TOKEN)) ? System.getProperty(ACL_TOKEN)
: FILE_CONFIG.getConfig(fileConfigKey);
return StringUtils.isNotBlank(aclToken) ? aclToken : null;
}
/**
* create a new service
*
* @param address
* @return newService
*/
private NewService createService(InetSocketAddress address) {
NewService newService = new NewService();
newService.setId(createServiceId(address));
newService.setName(getClusterName());
newService.setTags(Collections.singletonList(SERVICE_TAG));
newService.setPort(address.getPort());
newService.setAddress(NetUtil.toIpAddress(address));
newService.setCheck(createCheck(address));
return newService;
}
/**
* create service check based on TCP
*
* @param address
* @return
*/
private NewService.Check createCheck(InetSocketAddress address) {
NewService.Check check = new NewService.Check();
check.setTcp(NetUtil.toStringAddress(address));
check.setInterval(DEFAULT_CHECK_INTERVAL);
check.setTimeout(DEFAULT_CHECK_TIMEOUT);
check.setDeregisterCriticalServiceAfter(DEFAULT_DEREGISTER_TIME);
return check;
}
/**
* get healthy services
*
* @param service
* @return
*/
private Response<List<HealthService>> getHealthyServices(String service, long index, long watchTimeout) {
return getConsulClient().getHealthServices(service, HealthServicesRequest.newBuilder()
.setTag(SERVICE_TAG)
.setQueryParams(new QueryParams(watchTimeout, index))
.setPassing(true)
.setToken(getAclToken())
.build());
}
/**
* refresh cluster
*
* @param cluster
*/
private void refreshCluster(String cluster) {
if (cluster == null) {
return;
}
Response<List<HealthService>> response = getHealthyServices(getClusterName(), -1, -1);
if (response == null) {
return;
}
refreshCluster(cluster, response.getValue());
}
/**
* refresh cluster
*
* @param cluster
* @param services
*/
private void refreshCluster(String cluster, List<HealthService> services) {
if (cluster == null || services == null) {
return;
}
clusterAddressMap.put(cluster, services.stream()
.map(HealthService::getService)
.map(service -> new InetSocketAddress(service.getAddress(), service.getPort()))
.collect(Collectors.toList()));
}
/**
* consul notifier
*/
private class ConsulNotifier implements Runnable {
private String cluster;
private long consulIndex;
private boolean running;
private boolean hasError = false;
ConsulNotifier(String cluster, long consulIndex) {
this.cluster = cluster;
this.consulIndex = consulIndex;
this.running = true;
}
@Override
public void run() {
while (this.running) {
try {
processService();
} catch (Exception exception) {
hasError = true;
LOGGER.error("consul refresh services error:{}", exception.getMessage());
}
}
}
private void processService() {
Response<List<HealthService>> response = getHealthyServices(cluster, consulIndex, DEFAULT_WATCH_TIMEOUT);
Long currentIndex = response.getConsulIndex();
if ((currentIndex != null && currentIndex > consulIndex) || hasError) {
hasError = false;
List<HealthService> services = response.getValue();
consulIndex = currentIndex;
for (ConsulListener listener : listenerMap.get(cluster)) {
listener.onEvent(services);
}
}
}
void stop() {
this.running = false;
}
}
@Override
public void close() throws Exception {
client = null;
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.consul.ConsulRegistryProvider

View File

@@ -0,0 +1,63 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.consul;
import io.seata.discovery.registry.RegistryService;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
/**
* @author xingfudeshi@gmail.com
*/
public class ConsulRegistryServiceImplTest {
@Test
public void testRegister() throws Exception {
RegistryService registryService = mock(ConsulRegistryServiceImpl.class);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8091);
registryService.register(inetSocketAddress);
verify(registryService).register(inetSocketAddress);
}
@Test
public void testUnregister() throws Exception {
RegistryService registryService = mock(ConsulRegistryServiceImpl.class);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 8091);
registryService.unregister(inetSocketAddress);
verify(registryService).unregister(inetSocketAddress);
}
@Test
public void testSubscribe() throws Exception {
RegistryService registryService = mock(ConsulRegistryServiceImpl.class);
ConsulListener consulListener = mock(ConsulListener.class);
registryService.subscribe("test", consulListener);
verify(registryService).subscribe("test", consulListener);
}
@Test
public void testLookup() throws Exception {
RegistryService registryService = mock(ConsulRegistryServiceImpl.class);
registryService.lookup("test-key");
verify(registryService).lookup("test-key");
}
}

View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-core</artifactId>
<name>seata-discovery-core ${project.version}</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-config-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,49 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import java.util.List;
import io.seata.common.util.CollectionUtils;
/**
* The type Abstract load balance.
*
* @author slievrly
*/
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public <T> T select(List<T> invokers, String xid) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, xid);
}
/**
* Do select t.
*
* @param <T> the type parameter
* @param invokers the invokers
* @param xid the xid
* @return the t
*/
protected abstract <T> T doSelect(List<T> invokers, String xid);
}

View File

@@ -0,0 +1,102 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import io.seata.common.loader.LoadLevel;
import io.seata.config.ConfigurationFactory;
import static io.seata.common.DefaultValues.VIRTUAL_NODES_DEFAULT;
import static io.seata.discovery.loadbalance.LoadBalanceFactory.CONSISTENT_HASH_LOAD_BALANCE;
import static io.seata.discovery.loadbalance.LoadBalanceFactory.LOAD_BALANCE_PREFIX;
/**
* The type consistent hash load balance.
*
* @author ph3636
*/
@LoadLevel(name = CONSISTENT_HASH_LOAD_BALANCE)
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
/**
* The constant LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES.
*/
public static final String LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES = LOAD_BALANCE_PREFIX + "visualNodes";
/**
* The constant VIRTUAL_NODES_NUM.
*/
private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES, VIRTUAL_NODES_DEFAULT);
@Override
protected <T> T doSelect(List<T> invokers, String xid) {
return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid);
}
private static final class ConsistentHashSelector<T> {
private final SortedMap<Long, T> virtualInvokers = new TreeMap<>();
private final HashFunction hashFunction = new MD5Hash();
ConsistentHashSelector(List<T> invokers, int virtualNodes) {
for (T invoker : invokers) {
for (int i = 0; i < virtualNodes; i++) {
virtualInvokers.put(hashFunction.hash(invoker.toString() + i), invoker);
}
}
}
public T select(String objectKey) {
SortedMap<Long, T> tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey));
Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey();
return virtualInvokers.get(nodeHashVal);
}
}
private static class MD5Hash implements HashFunction {
MessageDigest instance;
public MD5Hash() {
try {
instance = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public long hash(String key) {
instance.reset();
instance.update(key.getBytes());
byte[] digest = instance.digest();
long h = 0;
for (int i = 0; i < 4; i++) {
h <<= 8;
h |= ((int) digest[i]) & 0xFF;
}
return h;
}
}
/**
* Hash String to long value
*/
public interface HashFunction {
long hash(String key);
}
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import io.seata.common.loader.LoadLevel;
import io.seata.common.rpc.RpcStatus;
import static io.seata.discovery.loadbalance.LoadBalanceFactory.LEAST_ACTIVE_LOAD_BALANCE;
/**
* The type Least Active load balance.
*
* @author ph3636
*/
@LoadLevel(name = LEAST_ACTIVE_LOAD_BALANCE)
public class LeastActiveLoadBalance extends AbstractLoadBalance {
@Override
protected <T> T doSelect(List<T> invokers, String xid) {
int length = invokers.size();
long leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[length];
for (int i = 0; i < length; i++) {
long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive();
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
} else if (active == leastActive) {
leastIndexes[leastCount++] = i;
}
}
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import java.util.List;
/**
* The interface Load balance.
*
* @author slievrly
*/
public interface LoadBalance {
/**
* Select t.
*
* @param <T> the type parameter
* @param invokers the invokers
* @param xid the xid
* @return the t
* @throws Exception the exception
*/
<T> T select(List<T> invokers, String xid) throws Exception;
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.config.ConfigurationFactory;
import static io.seata.common.DefaultValues.DEFAULT_LOAD_BALANCE;
/**
* The type Load balance factory.
*
* @author slievrly
*/
public class LoadBalanceFactory {
private static final String CLIENT_PREFIX = "client.";
/**
* The constant LOAD_BALANCE_PREFIX.
*/
public static final String LOAD_BALANCE_PREFIX = CLIENT_PREFIX + "loadBalance.";
public static final String LOAD_BALANCE_TYPE = LOAD_BALANCE_PREFIX + "type";
public static final String RANDOM_LOAD_BALANCE = DEFAULT_LOAD_BALANCE;
public static final String ROUND_ROBIN_LOAD_BALANCE = "RoundRobinLoadBalance";
public static final String CONSISTENT_HASH_LOAD_BALANCE = "ConsistentHashLoadBalance";
public static final String LEAST_ACTIVE_LOAD_BALANCE = "LeastActiveLoadBalance";
/**
* Get instance.
*
* @return the instance
*/
public static LoadBalance getInstance() {
String config = ConfigurationFactory.getInstance().getConfig(LOAD_BALANCE_TYPE, DEFAULT_LOAD_BALANCE);
return EnhancedServiceLoader.load(LoadBalance.class, config);
}
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import io.seata.common.loader.LoadLevel;
import static io.seata.discovery.loadbalance.LoadBalanceFactory.RANDOM_LOAD_BALANCE;
/**
* The type Random load balance.
*
* @author yuoyao
*/
@LoadLevel(name = RANDOM_LOAD_BALANCE)
public class RandomLoadBalance extends AbstractLoadBalance {
@Override
protected <T> T doSelect(List<T> invokers, String xid) {
int length = invokers.size();
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import io.seata.common.loader.LoadLevel;
import static io.seata.discovery.loadbalance.LoadBalanceFactory.ROUND_ROBIN_LOAD_BALANCE;
/**
* The type Round robin load balance.
*
* @author slievrly
*/
@LoadLevel(name = ROUND_ROBIN_LOAD_BALANCE)
public class RoundRobinLoadBalance extends AbstractLoadBalance {
private final AtomicInteger sequence = new AtomicInteger();
@Override
protected <T> T doSelect(List<T> invokers, String xid) {
int length = invokers.size();
return invokers.get(getPositiveSequence() % length);
}
private int getPositiveSequence() {
for (; ; ) {
int current = sequence.get();
int next = current >= Integer.MAX_VALUE ? 0 : current + 1;
if (sequence.compareAndSet(current, next)) {
return current;
}
}
}
}

View File

@@ -0,0 +1,106 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigChangeListener;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
* The type File registry service.
*
* @author slievrly
*/
public class FileRegistryServiceImpl implements RegistryService<ConfigChangeListener> {
private static volatile FileRegistryServiceImpl instance;
private static final Configuration CONFIG = ConfigurationFactory.getInstance();
private static final String POSTFIX_GROUPLIST = ".grouplist";
private static final String ENDPOINT_SPLIT_CHAR = ";";
private static final String IP_PORT_SPLIT_CHAR = ":";
private FileRegistryServiceImpl() {
}
/**
* Gets instance.
*
* @return the instance
*/
static FileRegistryServiceImpl getInstance() {
if (instance == null) {
synchronized (FileRegistryServiceImpl.class) {
if (instance == null) {
instance = new FileRegistryServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
}
@Override
public void subscribe(String cluster, ConfigChangeListener listener) throws Exception {
}
@Override
public void unsubscribe(String cluster, ConfigChangeListener listener) throws Exception {
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
String endpointStr = CONFIG.getConfig(
PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);
if (StringUtils.isNullOrEmpty(endpointStr)) {
throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");
}
String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);
List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
for (String endpoint : endpoints) {
String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);
if (ipAndPort.length != 2) {
throw new IllegalArgumentException("endpoint format should like ip:port");
}
inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));
}
return inetSocketAddresses;
}
@Override
public void close() throws Exception {
}
}

View File

@@ -0,0 +1,66 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry;
import java.util.Objects;
import io.seata.common.exception.NotSupportYetException;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.config.ConfigurationFactory;
import io.seata.config.ConfigurationKeys;
/**
* The type Registry factory.
*
* @author slievrly
*/
public class RegistryFactory {
private static volatile RegistryService instance = null;
/**
* Gets instance.
*
* @return the instance
*/
public static RegistryService getInstance() {
if (instance == null) {
synchronized (RegistryFactory.class) {
if (instance == null) {
instance = buildRegistryService();
}
}
}
return instance;
}
private static RegistryService buildRegistryService() {
RegistryType registryType;
String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(
ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR
+ ConfigurationKeys.FILE_ROOT_TYPE);
try {
registryType = RegistryType.getType(registryTypeName);
} catch (Exception exx) {
throw new NotSupportYetException("not support registry type: " + registryTypeName);
}
if (RegistryType.File == registryType) {
return FileRegistryServiceImpl.getInstance();
} else {
return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();
}
}
}

View File

@@ -0,0 +1,28 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry;
/**
* the interface registry provider
* @author xingfudeshi@gmail.com
*/
public interface RegistryProvider {
/**
* provide a registry implementation instance
* @return RegistryService
*/
RegistryService provide();
}

View File

@@ -0,0 +1,111 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationFactory;
/**
* The interface Registry service.
*
* @param <T> the type parameter
* @author slievrly
*/
public interface RegistryService<T> {
/**
* The constant PREFIX_SERVICE_MAPPING.
*/
String PREFIX_SERVICE_MAPPING = "vgroupMapping.";
/**
* The constant PREFIX_SERVICE_ROOT.
*/
String PREFIX_SERVICE_ROOT = "service";
/**
* The constant CONFIG_SPLIT_CHAR.
*/
String CONFIG_SPLIT_CHAR = ".";
Set<String> SERVICE_GROUP_NAME = new HashSet<>();
/**
* Register.
*
* @param address the address
* @throws Exception the exception
*/
void register(InetSocketAddress address) throws Exception;
/**
* Unregister.
*
* @param address the address
* @throws Exception the exception
*/
void unregister(InetSocketAddress address) throws Exception;
/**
* Subscribe.
*
* @param cluster the cluster
* @param listener the listener
* @throws Exception the exception
*/
void subscribe(String cluster, T listener) throws Exception;
/**
* Unsubscribe.
*
* @param cluster the cluster
* @param listener the listener
* @throws Exception the exception
*/
void unsubscribe(String cluster, T listener) throws Exception;
/**
* Lookup list.
*
* @param key the key
* @return the list
* @throws Exception the exception
*/
List<InetSocketAddress> lookup(String key) throws Exception;
/**
* Close.
* @throws Exception
*/
void close() throws Exception;
/**
* Get current service group name
*
* @param key service group
* @return the service group name
*/
default String getServiceGroup(String key) {
key = PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + PREFIX_SERVICE_MAPPING + key;
if (!SERVICE_GROUP_NAME.contains(key)) {
ConfigurationCache.addConfigListener(key);
SERVICE_GROUP_NAME.add(key);
}
return ConfigurationFactory.getInstance().getConfig(key);
}
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry;
/**
* The enum Registry type.
*
* @author slievrly
*/
public enum RegistryType {
/**
* File registry type.
*/
File,
/**
* ZK registry type.
*/
ZK,
/**
* Redis registry type.
*/
Redis,
/**
* Nacos registry type.
*/
Nacos,
/**
* Eureka registry type.
*/
Eureka,
/**
* Consul registry type
*/
Consul,
/**
* Etcd3 registry type
*/
Etcd3,
/**
* Sofa registry type
*/
Sofa,
/**
* Custom registry type
*/
Custom;
/**
* Gets type.
*
* @param name the name
* @return the type
*/
public static RegistryType getType(String name) {
for (RegistryType registryType : RegistryType.values()) {
if (registryType.name().equalsIgnoreCase(name)) {
return registryType;
}
}
throw new IllegalArgumentException("not support registry type: " + name);
}
}

View File

@@ -0,0 +1,52 @@
#
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
io.seata.discovery.loadbalance.RoundRobinLoadBalance
io.seata.discovery.loadbalance.RandomLoadBalance
io.seata.discovery.loadbalance.ConsistentHashLoadBalance
io.seata.discovery.loadbalance.LeastActiveLoadBalance

View File

@@ -0,0 +1,45 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.config;
import io.seata.discovery.loadbalance.ConsistentHashLoadBalance;
import io.seata.discovery.loadbalance.LoadBalanceFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* @author Geng Zhang
*/
class ConfigurationFactoryTest {
@Test
void getInstance() {
Configuration configuration = ConfigurationFactory.getInstance();
// check singleton
Assertions.assertEquals(configuration.getClass().getName(), ConfigurationFactory.getInstance().getClass().getName());
}
@Test
void getLoadBalance() {
Configuration configuration = ConfigurationFactory.getInstance();
String loadBalanceType = configuration.getConfig(LoadBalanceFactory.LOAD_BALANCE_TYPE);
int visualNode = configuration.getInt(ConsistentHashLoadBalance.LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES);
Assertions.assertEquals("RandomLoadBalance", loadBalanceType);
Assertions.assertEquals(10,visualNode);
}
}

View File

@@ -0,0 +1,130 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import io.seata.discovery.registry.RegistryFactory;
import io.seata.discovery.registry.RegistryService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* The type Load balance factory test.
*
* @author slievrly
*/
public class LoadBalanceFactoryTest {
private static final String XID = "XID";
/**
* Test get registry.
*
* @param loadBalance the load balance
* @throws Exception the exception
*/
@ParameterizedTest
@MethodSource("instanceProvider")
@Disabled
public void testGetRegistry(LoadBalance loadBalance) throws Exception {
Assertions.assertNotNull(loadBalance);
RegistryService registryService = RegistryFactory.getInstance();
InetSocketAddress address1 = new InetSocketAddress("127.0.0.1", 8091);
InetSocketAddress address2 = new InetSocketAddress("127.0.0.1", 8092);
registryService.register(address1);
registryService.register(address2);
List<InetSocketAddress> addressList = registryService.lookup("my_test_tx_group");
InetSocketAddress balanceAddress = loadBalance.select(addressList, XID);
Assertions.assertNotNull(balanceAddress);
}
/**
* Test get address.
*
* @param loadBalance the load balance
* @throws Exception the exception
*/
@ParameterizedTest
@MethodSource("instanceProvider")
@Disabled
public void testUnRegistry(LoadBalance loadBalance) throws Exception {
RegistryService registryService = RegistryFactory.getInstance();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091);
registryService.unregister(address);
}
/**
* Test subscribe.
*
* @param loadBalance the load balance
* @throws Exception the exception
*/
@ParameterizedTest
@MethodSource("instanceProvider")
@Disabled
public void testSubscribe(LoadBalance loadBalance) throws Exception {
Assertions.assertNotNull(loadBalance);
RegistryService registryService = RegistryFactory.getInstance();
InetSocketAddress address1 = new InetSocketAddress("127.0.0.1", 8091);
InetSocketAddress address2 = new InetSocketAddress("127.0.0.1", 8092);
registryService.register(address1);
registryService.register(address2);
List<InetSocketAddress> addressList = registryService.lookup("my_test_tx_group");
InetSocketAddress balanceAddress = loadBalance.select(addressList, XID);
Assertions.assertNotNull(balanceAddress);
//wait trigger testUnRegistry
TimeUnit.SECONDS.sleep(30);
List<InetSocketAddress> addressList1 = registryService.lookup("my_test_tx_group");
Assertions.assertEquals(1, addressList1.size());
}
/**
* Test get address.
*
* @param loadBalance the load balance
* @throws Exception the exception
*/
@ParameterizedTest
@MethodSource("instanceProvider")
public void testGetAddress(LoadBalance loadBalance) throws Exception {
Assertions.assertNotNull(loadBalance);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091);
List<InetSocketAddress> addressList = new ArrayList<>();
addressList.add(address);
InetSocketAddress balanceAddress = loadBalance.select(addressList, XID);
Assertions.assertEquals(address, balanceAddress);
}
/**
* Instance provider object [ ] [ ].
*
* @return the object [ ] [ ]
*/
static Stream<Arguments> instanceProvider() {
LoadBalance loadBalance = LoadBalanceFactory.getInstance();
return Stream.of(
Arguments.of(loadBalance)
);
}
}

View File

@@ -0,0 +1,162 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.loadbalance;
import io.seata.common.rpc.RpcStatus;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
/**
* Created by guoyao on 2019/2/14.
*/
public class LoadBalanceTest {
private static final String XID = "XID";
/**
* Test random load balance select.
*
* @param addresses the addresses
*/
@ParameterizedTest
@MethodSource("addressProvider")
public void testRandomLoadBalance_select(List<InetSocketAddress> addresses) {
int runs = 10000;
Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs, addresses, new RandomLoadBalance());
for (InetSocketAddress address : counter.keySet()) {
Long count = counter.get(address).get();
Assertions.assertTrue(count > 0, "selecte one time at last");
}
}
/**
* Test round robin load balance select.
*
* @param addresses the addresses
*/
@ParameterizedTest
@MethodSource("addressProvider")
public void testRoundRobinLoadBalance_select(List<InetSocketAddress> addresses) {
int runs = 10000;
Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs, addresses, new RoundRobinLoadBalance());
for (InetSocketAddress address : counter.keySet()) {
Long count = counter.get(address).get();
Assertions.assertTrue(Math.abs(count - runs / (0f + addresses.size())) < 1f, "abs diff shoud < 1");
}
}
/**
* Test consistent hash load load balance select.
*
* @param addresses the addresses
*/
@ParameterizedTest
@MethodSource("addressProvider")
public void testConsistentHashLoadBalance_select(List<InetSocketAddress> addresses) {
int runs = 10000;
int selected = 0;
ConsistentHashLoadBalance loadBalance = new ConsistentHashLoadBalance();
Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs, addresses, loadBalance);
for (InetSocketAddress address : counter.keySet()) {
if (counter.get(address).get() > 0) {
selected++;
}
}
Assertions.assertEquals(1, selected, "selected must be equal to 1");
}
/**
* Test least active load balance select.
*
* @param addresses the addresses
*/
@ParameterizedTest
@MethodSource("addressProvider")
public void testLeastActiveLoadBalance_select(List<InetSocketAddress> addresses) throws Exception {
int runs = 10000;
int size = addresses.size();
for (int i = 0; i < size - 1; i++) {
RpcStatus.beginCount(addresses.get(i).toString());
}
InetSocketAddress socketAddress = addresses.get(size - 1);
LoadBalance loadBalance = new LeastActiveLoadBalance();
for (int i = 0; i < runs; i++) {
InetSocketAddress selectAddress = loadBalance.select(addresses, XID);
Assertions.assertEquals(selectAddress, socketAddress);
}
RpcStatus.beginCount(socketAddress.toString());
RpcStatus.beginCount(socketAddress.toString());
Map<InetSocketAddress, AtomicLong> counter = getSelectedCounter(runs, addresses, loadBalance);
for (InetSocketAddress address : counter.keySet()) {
Long count = counter.get(address).get();
if (address == socketAddress) {
Assertions.assertEquals(count, 0);
} else {
Assertions.assertTrue(count > 0);
}
}
}
/**
* Gets selected counter.
*
* @param runs the runs
* @param addresses the addresses
* @param loadBalance the load balance
* @return the selected counter
*/
public Map<InetSocketAddress, AtomicLong> getSelectedCounter(int runs, List<InetSocketAddress> addresses,
LoadBalance loadBalance) {
Assertions.assertNotNull(loadBalance);
Map<InetSocketAddress, AtomicLong> counter = new ConcurrentHashMap<>();
for (InetSocketAddress address : addresses) {
counter.put(address, new AtomicLong(0));
}
try {
for (int i = 0; i < runs; i++) {
InetSocketAddress selectAddress = loadBalance.select(addresses, XID);
counter.get(selectAddress).incrementAndGet();
}
} catch (Exception e) {
//do nothing
}
return counter;
}
/**
* Address provider object [ ] [ ].
*
* @return Stream<List < InetSocketAddress>>
*/
static Stream<List<InetSocketAddress>> addressProvider() {
return Stream.of(
Arrays.asList(new InetSocketAddress("127.0.0.1", 8091),
new InetSocketAddress("127.0.0.1", 8092),
new InetSocketAddress("127.0.0.1", 8093),
new InetSocketAddress("127.0.0.1", 8094),
new InetSocketAddress("127.0.0.1", 8095))
);
}
}

View File

@@ -0,0 +1,21 @@
#
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#apollo config
registry.redis.serverAddr = 192.168.1.204:6379
registry.redis.db = 6
registry.redis.max.active = 16
service.vgroupMapping.my_test_tx_group = default

View File

@@ -0,0 +1,23 @@
#
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#loadBalance config
client {
loadBalance {
type = "RandomLoadBalance"
visualNodes = 10
}
}

View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-custom</artifactId>
<name>seata-discovery-custom ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,53 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.custom;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.loader.LoadLevel;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryProvider;
import io.seata.discovery.registry.RegistryService;
import io.seata.discovery.registry.RegistryType;
import java.util.stream.Stream;
/**
* @author ggndnn
*/
@LoadLevel(name = "Custom")
public class CustomRegistryProvider implements RegistryProvider {
private static final String FILE_CONFIG_KEY_PREFIX = "registry.custom.name";
private final String customName;
public CustomRegistryProvider() {
String name = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(FILE_CONFIG_KEY_PREFIX);
if (StringUtils.isBlank(name)) {
throw new IllegalArgumentException("name value of custom registry type must not be blank");
}
if (Stream.of(RegistryType.values())
.anyMatch(ct -> ct.name().equalsIgnoreCase(name))) {
throw new IllegalArgumentException(String.format("custom registry type name %s is not allowed", name));
}
customName = name;
}
@Override
public RegistryService provide() {
return EnhancedServiceLoader.load(RegistryProvider.class, customName).provide();
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.custom.CustomRegistryProvider

View File

@@ -0,0 +1,28 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.custom;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryProvider;
import io.seata.discovery.registry.RegistryService;
@LoadLevel(name = "forTest")
public class CustomRegistryProviderForTest implements RegistryProvider {
@Override
public RegistryService provide() {
return new CustomRegistryServiceForTest();
}
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.custom;
import io.seata.config.ConfigChangeListener;
import io.seata.discovery.registry.RegistryService;
import java.net.InetSocketAddress;
import java.util.List;
public class CustomRegistryServiceForTest implements RegistryService<ConfigChangeListener> {
@Override
public void register(InetSocketAddress address) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(String cluster, ConfigChangeListener listener) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void unsubscribe(String cluster, ConfigChangeListener listener) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public void close() throws Exception {
throw new UnsupportedOperationException();
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.custom;
import io.seata.discovery.registry.RegistryFactory;
import io.seata.discovery.registry.RegistryService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class CustomRegistryTest {
@Test
public void testCustomRegistryLoad() {
RegistryService registryService = RegistryFactory.getInstance();
Assertions.assertTrue(registryService instanceof CustomRegistryServiceForTest);
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.custom.CustomRegistryProviderForTest

View File

@@ -0,0 +1,7 @@
registry {
type = "custom"
custom {
name = "forTest"
}
}

View File

@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-etcd3</artifactId>
<name>seata-discovery-etcd3 ${project.version}</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!--test-->
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.etcd3;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryProvider;
import io.seata.discovery.registry.RegistryService;
/**
* @author xingfudeshi@gmail.com
*/
@LoadLevel(name = "Etcd3", order = 1)
public class EtcdRegistryProvider implements RegistryProvider {
@Override
public RegistryService provide() {
return EtcdRegistryServiceImpl.getInstance();
}
}

View File

@@ -0,0 +1,439 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.etcd3;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.LeaseOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchResponse;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static io.netty.util.CharsetUtil.UTF_8;
/**
* @author xingfudeshi@gmail.com
*/
public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> {
private static final Logger LOGGER = LoggerFactory.getLogger(EtcdRegistryServiceImpl.class);
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static final String FILE_ROOT_REGISTRY = "registry";
private static final String FILE_CONFIG_SPLIT_CHAR = ".";
private static final String REGISTRY_TYPE = "etcd3";
private static final String SERVER_ADDR_KEY = "serverAddr";
private static final String REGISTRY_CLUSTER = "cluster";
private static final String DEFAULT_CLUSTER_NAME = "default";
private static final String REGISTRY_KEY_PREFIX = "registry-seata-";
private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE + FILE_CONFIG_SPLIT_CHAR;
private static final int MAP_INITIAL_CAPACITY = 8;
private static final int THREAD_POOL_SIZE = 2;
private ExecutorService executorService;
/**
* TTL for lease
*/
private static final long TTL = 10;
/**
* interval for life keep
*/
private final static long LIFE_KEEP_INTERVAL = 5;
/**
* critical value for life keep
*/
private final static long LIFE_KEEP_CRITICAL = 6;
private static volatile EtcdRegistryServiceImpl instance;
private static volatile Client client;
private ConcurrentMap<String, Pair<Long /*revision*/, List<InetSocketAddress>>> clusterAddressMap;
private ConcurrentMap<String, Set<Watch.Listener>> listenerMap;
private ConcurrentMap<String, EtcdWatcher> watcherMap;
private static long leaseId = 0;
private EtcdLifeKeeper lifeKeeper = null;
private Future<Boolean> lifeKeeperFuture = null;
/**
* a endpoint for unit testing
*/
public static final String TEST_ENDPONT = "etcd-test-lancher-endpoint";
private EtcdRegistryServiceImpl() {
clusterAddressMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
listenerMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
watcherMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("registry-etcd3", THREAD_POOL_SIZE));
}
/**
* get etcd registry service instance
*
* @return instance
*/
static EtcdRegistryServiceImpl getInstance() {
if (instance == null) {
synchronized (EtcdRegistryServiceImpl.class) {
if (instance == null) {
instance = new EtcdRegistryServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
doRegister(address);
}
/**
* do registry
*
* @param address
*/
private void doRegister(InetSocketAddress address) throws Exception {
PutOption putOption = PutOption.newBuilder().withLeaseId(getLeaseId()).build();
getClient().getKVClient().put(buildRegistryKey(address), buildRegistryValue(address), putOption).get();
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
doUnregister(address);
}
/**
* do unregister
*
* @param address
* @throws Exception
*/
private void doUnregister(InetSocketAddress address) throws Exception {
getClient().getKVClient().delete(buildRegistryKey(address)).get();
}
@Override
public void subscribe(String cluster, Watch.Listener listener) throws Exception {
listenerMap.computeIfAbsent(cluster, key -> new HashSet<>())
.add(listener);
EtcdWatcher watcher = watcherMap.computeIfAbsent(cluster, w -> new EtcdWatcher(cluster, listener));
executorService.submit(watcher);
}
@Override
public void unsubscribe(String cluster, Watch.Listener listener) throws Exception {
Set<Watch.Listener> subscribeSet = listenerMap.get(cluster);
if (subscribeSet != null) {
Set<Watch.Listener> newSubscribeSet = subscribeSet.stream()
.filter(eventListener -> !eventListener.equals(listener))
.collect(Collectors.toSet());
listenerMap.put(cluster, newSubscribeSet);
}
watcherMap.remove(cluster).stop();
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
final String cluster = getServiceGroup(key);
if (cluster == null) {
return null;
}
if (!listenerMap.containsKey(cluster)) {
//1.refresh
refreshCluster(cluster);
//2.subscribe
subscribe(cluster, new Watch.Listener() {
@Override
public void onNext(WatchResponse response) {
try {
refreshCluster(cluster);
} catch (Exception e) {
LOGGER.error("etcd watch listener", e);
throw new RuntimeException(e.getMessage());
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
});
}
Pair<Long, List<InetSocketAddress>> pair = clusterAddressMap.get(cluster);
return Objects.isNull(pair) ? Collections.emptyList() : pair.getValue();
}
@Override
public void close() throws Exception {
if (lifeKeeper != null) {
lifeKeeper.stop();
if (lifeKeeperFuture != null) {
lifeKeeperFuture.get(3, TimeUnit.SECONDS);
}
}
}
/**
* refresh cluster
*
* @param cluster
* @throws Exception
*/
private void refreshCluster(String cluster) throws Exception {
if (cluster == null) {
return;
}
//1.get all available registries
GetOption getOption = GetOption.newBuilder().withPrefix(buildRegistryKeyPrefix(cluster)).build();
GetResponse getResponse = getClient().getKVClient().get(buildRegistryKeyPrefix(cluster), getOption).get();
//2.add to list
List<InetSocketAddress> instanceList = getResponse.getKvs().stream().map(keyValue -> {
String[] instanceInfo = keyValue.getValue().toString(UTF_8).split(":");
return new InetSocketAddress(instanceInfo[0], Integer.parseInt(instanceInfo[1]));
}).collect(Collectors.toList());
clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList));
}
/**
* get client
*
* @return client
*/
private Client getClient() {
if (client == null) {
synchronized (EtcdRegistryServiceImpl.class) {
if (client == null) {
String testEndpoint = System.getProperty(TEST_ENDPONT);
if (StringUtils.isNotBlank(testEndpoint)) {
client = Client.builder().endpoints(testEndpoint).build();
} else {
client = Client.builder().endpoints(FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY)).build();
}
}
}
}
return client;
}
/**
* get cluster name
*
* @return
*/
private String getClusterName() {
String clusterConfigName = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER);
return FILE_CONFIG.getConfig(clusterConfigName, DEFAULT_CLUSTER_NAME);
}
/**
* create a new lease id or return a existing lease id
*/
private long getLeaseId() throws Exception {
if (0 == leaseId) {
//create a new lease
leaseId = getClient().getLeaseClient().grant(TTL).get().getID();
lifeKeeper = new EtcdLifeKeeper(leaseId);
lifeKeeperFuture = executorService.submit(lifeKeeper);
}
return leaseId;
}
/**
* build registry key
*
* @return registry key
*/
private ByteSequence buildRegistryKey(InetSocketAddress address) {
return ByteSequence.from(REGISTRY_KEY_PREFIX + getClusterName() + "-" + NetUtil.toStringAddress(address), UTF_8);
}
/**
* build registry key prefix
*
* @return registry key prefix
*/
private ByteSequence buildRegistryKeyPrefix(String cluster) {
return ByteSequence.from(REGISTRY_KEY_PREFIX + cluster, UTF_8);
}
/**
* build registry value
*
* @param address
* @return registry value
*/
private ByteSequence buildRegistryValue(InetSocketAddress address) {
return ByteSequence.from(NetUtil.toStringAddress(address), UTF_8);
}
/**
* the type etcd life keeper
*/
private class EtcdLifeKeeper implements Callable<Boolean> {
private final long leaseId;
private final Lease leaseClient;
private boolean running;
public EtcdLifeKeeper(long leaseId) {
this.leaseClient = getClient().getLeaseClient();
this.leaseId = leaseId;
this.running = true;
}
/**
* process
*/
private void process() {
for (; ; ) {
try {
//1.get TTL
LeaseTimeToLiveResponse leaseTimeToLiveResponse = this.leaseClient.timeToLive(this.leaseId, LeaseOption.DEFAULT).get();
final long tTl = leaseTimeToLiveResponse.getTTl();
if (tTl <= LIFE_KEEP_CRITICAL) {
//2.refresh the TTL
this.leaseClient.keepAliveOnce(this.leaseId).get();
}
TimeUnit.SECONDS.sleep(LIFE_KEEP_INTERVAL);
} catch (Exception e) {
LOGGER.error("EtcdLifeKeeper", e);
throw new ShouldNeverHappenException("failed to renewal the lease.");
}
}
}
/**
* stop this task
*/
public void stop() {
this.running = false;
}
@Override
public Boolean call() {
if (this.running) {
process();
}
return this.running;
}
}
/**
* the type etcd watcher
*/
private class EtcdWatcher implements Runnable {
private final Watch.Listener listener;
private Watch.Watcher watcher;
private String cluster;
public EtcdWatcher(String cluster, Watch.Listener listener) {
this.cluster = cluster;
this.listener = listener;
}
@Override
public void run() {
Watch watchClient = getClient().getWatchClient();
WatchOption.Builder watchOptionBuilder = WatchOption.newBuilder().withPrefix(buildRegistryKeyPrefix(cluster));
Pair<Long /*revision*/, List<InetSocketAddress>> addressPair = clusterAddressMap.get(cluster);
if (Objects.nonNull(addressPair)) {
// Maybe addressPair isn't newest now, but it's ok
watchOptionBuilder.withRevision(addressPair.getKey());
}
this.watcher = watchClient.watch(buildRegistryKeyPrefix(cluster), watchOptionBuilder.build(), this.listener);
}
/**
* stop this task
*/
public void stop() {
this.watcher.close();
}
}
private static class Pair<K,V> {
/**
* Key of this <code>Pair</code>.
*/
private K key;
/**
* Value of this this <code>Pair</code>.
*/
private V value;
/**
* Creates a new pair
* @param key The key for this pair
* @param value The value to use for this pair
*/
public Pair(K key, V value) {
this.key = key;
this.value = value;
}
/**
* Gets the key for this pair.
* @return key for this pair
*/
public K getKey() { return key; }
/**
* Gets the value for this pair.
* @return value for this pair
*/
public V getValue() { return value; }
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.etcd3.EtcdRegistryProvider

View File

@@ -0,0 +1,36 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.etcd;
import io.seata.discovery.registry.etcd3.EtcdRegistryProvider;
import io.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author xingfudeshi@gmail.com
* the type etcd registry provider test
*/
public class EtcdRegistryProviderTest {
/**
* test provide
*/
@Test
public void testProvide() {
assertThat(new EtcdRegistryProvider().provide()).isInstanceOf(EtcdRegistryServiceImpl.class);
}
}

View File

@@ -0,0 +1,201 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.etcd;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.launcher.junit.EtcdClusterResource;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.watch.WatchResponse;
import io.seata.discovery.registry.etcd3.EtcdRegistryProvider;
import io.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
import io.seata.discovery.registry.RegistryService;
import org.junit.Rule;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author xingfudeshi@gmail.com
*/
@Disabled
public class EtcdRegistryServiceImplTest {
private static final String REGISTRY_KEY_PREFIX = "registry-seata-";
private static final String CLUSTER_NAME = "default";
@Rule
private final static EtcdClusterResource etcd = new EtcdClusterResource(CLUSTER_NAME, 1);
private final Client client = Client.builder().endpoints(etcd.cluster().getClientEndpoints()).build();
private final static String HOST = "127.0.0.1";
private final static int PORT = 8091;
@BeforeAll
public static void beforeClass() throws Exception {
System.setProperty(EtcdRegistryServiceImpl.TEST_ENDPONT, etcd.cluster().getClientEndpoints().get(0).toString());
}
@AfterAll
public static void afterClass() throws Exception {
System.setProperty(EtcdRegistryServiceImpl.TEST_ENDPONT, "");
}
@Test
public void testRegister() throws Exception {
RegistryService registryService = new EtcdRegistryProvider().provide();
InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT);
//1.register
registryService.register(inetSocketAddress);
//2.get instance information
GetOption getOption = GetOption.newBuilder().withPrefix(buildRegistryKeyPrefix()).build();
long count = client.getKVClient().get(buildRegistryKeyPrefix(), getOption).get().getKvs().stream().filter(keyValue -> {
String[] instanceInfo = keyValue.getValue().toString(UTF_8).split(":");
return HOST.equals(instanceInfo[0]) && PORT == Integer.parseInt(instanceInfo[1]);
}).count();
assertThat(count).isEqualTo(1);
}
@Test
public void testUnregister() throws Exception {
RegistryService registryService = new EtcdRegistryProvider().provide();
InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT);
//1.register
registryService.register(inetSocketAddress);
//2.get instance information
GetOption getOption = GetOption.newBuilder().withPrefix(buildRegistryKeyPrefix()).build();
long count = client.getKVClient().get(buildRegistryKeyPrefix(), getOption).get().getKvs().stream().filter(keyValue -> {
String[] instanceInfo = keyValue.getValue().toString(UTF_8).split(":");
return HOST.equals(instanceInfo[0]) && PORT == Integer.parseInt(instanceInfo[1]);
}).count();
assertThat(count).isEqualTo(1);
//3.unregister
registryService.unregister(inetSocketAddress);
//4.again get instance information
getOption = GetOption.newBuilder().withPrefix(buildRegistryKeyPrefix()).build();
count = client.getKVClient().get(buildRegistryKeyPrefix(), getOption).get().getKvs().stream().filter(keyValue -> {
String[] instanceInfo = keyValue.getValue().toString(UTF_8).split(":");
return HOST.equals(instanceInfo[0]) && PORT == Integer.parseInt(instanceInfo[1]);
}).count();
assertThat(count).isEqualTo(0);
}
@Test
public void testSubscribe() throws Exception {
RegistryService registryService = new EtcdRegistryProvider().provide();
InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT);
//1.register
registryService.register(inetSocketAddress);
//2.subscribe
EtcdListener etcdListener = new EtcdListener();
registryService.subscribe(CLUSTER_NAME, etcdListener);
//3.delete instance,see if the listener can be notified
DeleteOption deleteOption = DeleteOption.newBuilder().withPrefix(buildRegistryKeyPrefix()).build();
client.getKVClient().delete(buildRegistryKeyPrefix(), deleteOption).get();
assertThat(etcdListener.isNotified()).isTrue();
}
@Test
public void testUnsubscribe() throws Exception {
RegistryService registryService = new EtcdRegistryProvider().provide();
InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT);
//1.register
registryService.register(inetSocketAddress);
//2.subscribe
EtcdListener etcdListener = new EtcdListener();
registryService.subscribe(CLUSTER_NAME, etcdListener);
//3.delete instance,see if the listener can be notified
DeleteOption deleteOption = DeleteOption.newBuilder().withPrefix(buildRegistryKeyPrefix()).build();
client.getKVClient().delete(buildRegistryKeyPrefix(), deleteOption).get();
assertThat(etcdListener.isNotified()).isTrue();
//4.unsubscribe
registryService.unsubscribe(CLUSTER_NAME, etcdListener);
//5.reset
etcdListener.reset();
//6.put instance,the listener should not be notified
client.getKVClient().put(buildRegistryKeyPrefix(), ByteSequence.from("test", UTF_8)).get();
assertThat(etcdListener.isNotified()).isFalse();
}
@Test
public void testLookup() throws Exception {
RegistryService registryService = new EtcdRegistryProvider().provide();
InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT);
//1.register
registryService.register(inetSocketAddress);
//2.lookup
List<InetSocketAddress> inetSocketAddresses = registryService.lookup("my_test_tx_group");
assertThat(inetSocketAddresses).size().isEqualTo(1);
}
/**
* build registry key prefix
*
* @return
*/
private ByteSequence buildRegistryKeyPrefix() {
return ByteSequence.from(REGISTRY_KEY_PREFIX, UTF_8);
}
/**
* etcd listener
*/
private static class EtcdListener implements Watch.Listener {
private boolean notified = false;
@Override
public void onNext(WatchResponse response) {
notified = true;
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
/**
* @return
*/
public boolean isNotified() throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
return notified;
}
/**
* reset
*/
private void reset() {
this.notified = false;
}
}
}

View File

@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-eureka</artifactId>
<name>seata-discovery-eureka ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.eureka</groupId>
<artifactId>eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.archaius</groupId>
<artifactId>archaius-core</artifactId>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,85 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.eureka;
import io.seata.common.util.StringUtils;
import com.netflix.appinfo.EurekaInstanceConfig;
import com.netflix.appinfo.MyDataCenterInstanceConfig;
/**
* @author: rui_849217@163.com
* override MyDataCenterInstanceConfig for set value,
* eg: instanceId \ipAddress \ applicationName...
*/
public class CustomEurekaInstanceConfig extends MyDataCenterInstanceConfig implements EurekaInstanceConfig {
private String applicationName;
private String instanceId;
private String ipAddress;
private int port = -1;
@Override
public String getInstanceId() {
if (StringUtils.isBlank(instanceId)) {
return super.getInstanceId();
}
return instanceId;
}
@Override
public String getIpAddress() {
if (StringUtils.isBlank(ipAddress)) {
return super.getIpAddress();
}
return ipAddress;
}
@Override
public int getNonSecurePort() {
if (port == -1) {
return super.getNonSecurePort();
}
return port;
}
@Override
public String getAppname() {
if (StringUtils.isBlank(applicationName)) {
return super.getAppname();
}
return applicationName;
}
@Override
public String getHostName(boolean refresh) {
return this.getIpAddress();
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
public void setIpAddress(String ipAddress) {
this.ipAddress = ipAddress;
}
public void setPort(int port) {
this.port = port;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.eureka;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryService;
import io.seata.discovery.registry.RegistryProvider;
/**
* @author xingfudeshi@gmail.com
*/
@LoadLevel(name = "Eureka", order = 1)
public class EurekaRegistryProvider implements RegistryProvider {
@Override
public RegistryService provide() {
return EurekaRegistryServiceImpl.getInstance();
}
}

View File

@@ -0,0 +1,260 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.eureka;
import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.providers.EurekaConfigBasedInstanceInfoProvider;
import com.netflix.config.ConfigurationManager;
import com.netflix.discovery.DefaultEurekaClientConfig;
import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEventListener;
import com.netflix.discovery.shared.Application;
import io.seata.common.exception.EurekaRegistryException;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
* The type Eureka registry service.
*
* @author: rui_849217@163.com
*/
public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventListener> {
private static final Logger LOGGER = LoggerFactory.getLogger(EurekaRegistryServiceImpl.class);
private static final String DEFAULT_APPLICATION = "default";
private static final String PRO_SERVICE_URL_KEY = "serviceUrl";
private static final String FILE_ROOT_REGISTRY = "registry";
private static final String FILE_CONFIG_SPLIT_CHAR = ".";
private static final String REGISTRY_TYPE = "eureka";
private static final String CLUSTER = "application";
private static final String REGISTRY_WEIGHT = "weight";
private static final String EUREKA_CONFIG_SERVER_URL_KEY = "eureka.serviceUrl.default";
private static final String EUREKA_CONFIG_REFRESH_KEY = "eureka.client.refresh.interval";
private static final String EUREKA_CONFIG_SHOULD_REGISTER = "eureka.registration.enabled";
private static final String EUREKA_CONFIG_METADATA_WEIGHT = "eureka.metadata.weight";
private static final int EUREKA_REFRESH_INTERVAL = 5;
private static final int MAP_INITIAL_CAPACITY = 8;
private static final String DEFAULT_WEIGHT = "1";
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static ConcurrentMap<String, Set<InetSocketAddress>> clusterAddressMap;
private static volatile boolean subscribeListener = false;
private static volatile ApplicationInfoManager applicationInfoManager;
private static volatile CustomEurekaInstanceConfig instanceConfig;
private static volatile EurekaRegistryServiceImpl instance;
private static volatile EurekaClient eurekaClient;
private EurekaRegistryServiceImpl() {
}
static EurekaRegistryServiceImpl getInstance() {
if (instance == null) {
synchronized (EurekaRegistryServiceImpl.class) {
if (instance == null) {
clusterAddressMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
instanceConfig = new CustomEurekaInstanceConfig();
instance = new EurekaRegistryServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
instanceConfig.setIpAddress(address.getAddress().getHostAddress());
instanceConfig.setPort(address.getPort());
instanceConfig.setApplicationName(getApplicationName());
instanceConfig.setInstanceId(getInstanceId());
getEurekaClient(true);
applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
if (eurekaClient == null) {
return;
}
applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
}
@Override
public void subscribe(String cluster, EurekaEventListener listener) throws Exception {
subscribeListener = true;
getEurekaClient(false).registerEventListener(listener);
}
@Override
public void unsubscribe(String cluster, EurekaEventListener listener) throws Exception {
subscribeListener = false;
getEurekaClient(false).unregisterEventListener(listener);
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
if (!subscribeListener) {
refreshCluster();
subscribe(null, event -> {
try {
refreshCluster();
} catch (Exception e) {
LOGGER.error("Eureka event listener refreshCluster error:{}", e.getMessage(), e);
}
});
}
return new ArrayList<>(clusterAddressMap.getOrDefault(clusterName.toUpperCase(), Collections.emptySet()));
}
@Override
public void close() throws Exception {
if (eurekaClient != null) {
eurekaClient.shutdown();
}
clean();
}
private void refreshCluster() {
List<Application> applications = getEurekaClient(false).getApplications().getRegisteredApplications();
if (CollectionUtils.isEmpty(applications)) {
clusterAddressMap.clear();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("refreshCluster success, cluster empty!");
}
return;
}
ConcurrentMap<String, Set<InetSocketAddress>> collect = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
for (Application application : applications) {
List<InstanceInfo> instances = application.getInstances();
if (CollectionUtils.isNotEmpty(instances)) {
Set<InetSocketAddress> addressSet = instances.stream()
.map(instance -> new InetSocketAddress(instance.getIPAddr(), instance.getPort()))
.collect(Collectors.toSet());
collect.put(application.getName(), addressSet);
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("refreshCluster success, cluster: " + collect);
}
clusterAddressMap = collect;
}
private Properties getEurekaProperties(boolean needRegister) {
Properties eurekaProperties = new Properties();
eurekaProperties.setProperty(EUREKA_CONFIG_REFRESH_KEY, String.valueOf(EUREKA_REFRESH_INTERVAL));
String url = FILE_CONFIG.getConfig(getEurekaServerUrlFileKey());
if (StringUtils.isBlank(url)) {
throw new EurekaRegistryException("eureka server url can not be null!");
}
eurekaProperties.setProperty(EUREKA_CONFIG_SERVER_URL_KEY, url);
String weight = FILE_CONFIG.getConfig(getEurekaInstanceWeightFileKey());
if (StringUtils.isNotBlank(weight)) {
eurekaProperties.setProperty(EUREKA_CONFIG_METADATA_WEIGHT, weight);
} else {
eurekaProperties.setProperty(EUREKA_CONFIG_METADATA_WEIGHT, DEFAULT_WEIGHT);
}
if (!needRegister) {
eurekaProperties.setProperty(EUREKA_CONFIG_SHOULD_REGISTER, "false");
}
return eurekaProperties;
}
private String getApplicationName() {
String application = FILE_CONFIG.getConfig(getEurekaApplicationFileKey());
if (application == null) {
application = DEFAULT_APPLICATION;
}
return application;
}
private EurekaClient getEurekaClient(boolean needRegister) throws EurekaRegistryException {
if (eurekaClient == null) {
synchronized (EurekaRegistryServiceImpl.class) {
try {
if (eurekaClient == null) {
if (!needRegister) {
instanceConfig = new CustomEurekaInstanceConfig();
}
ConfigurationManager.loadProperties(getEurekaProperties(needRegister));
InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
eurekaClient = new DiscoveryClient(applicationInfoManager, new DefaultEurekaClientConfig());
}
} catch (Exception e) {
clean();
throw new EurekaRegistryException("register eureka is error!", e);
}
}
}
return eurekaClient;
}
private void clean() {
eurekaClient = null;
applicationInfoManager = null;
instanceConfig = null;
}
private String getInstanceId() {
return String.format("%s:%s:%d", instanceConfig.getIpAddress(), instanceConfig.getAppname(),
instanceConfig.getNonSecurePort());
}
private String getEurekaServerUrlFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_SERVICE_URL_KEY);
}
private String getEurekaApplicationFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, CLUSTER);
}
private String getEurekaInstanceWeightFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_WEIGHT);
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.eureka.EurekaRegistryProvider

View File

@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-nacos</artifactId>
<name>seata-discovery-nacos ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.nacos;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryService;
import io.seata.discovery.registry.RegistryProvider;
/**
* @author xingfudeshi@gmail.com
*/
@LoadLevel(name = "Nacos", order = 1)
public class NacosRegistryProvider implements RegistryProvider {
@Override
public RegistryService provide() {
return NacosRegistryServiceImpl.getInstance();
}
}

View File

@@ -0,0 +1,248 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.nacos;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.config.ConfigurationKeys;
import io.seata.discovery.registry.RegistryService;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
* The type Nacos registry service.
*
* @author slievrly
*/
public class NacosRegistryServiceImpl implements RegistryService<EventListener> {
private static final String DEFAULT_NAMESPACE = "";
private static final String DEFAULT_CLUSTER = "default";
private static final String DEFAULT_GROUP = "DEFAULT_GROUP";
private static final String DEFAULT_APPLICATION = "seata-server";
private static final String PRO_SERVER_ADDR_KEY = "serverAddr";
private static final String PRO_NAMESPACE_KEY = "namespace";
private static final String REGISTRY_TYPE = "nacos";
private static final String REGISTRY_CLUSTER = "cluster";
private static final String PRO_APPLICATION_KEY = "application";
private static final String PRO_GROUP_KEY = "group";
private static final String USER_NAME = "username";
private static final String PASSWORD = "password";
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static volatile NamingService naming;
private static final ConcurrentMap<String, List<EventListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, List<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>();
private static volatile NacosRegistryServiceImpl instance;
private static final Object LOCK_OBJ = new Object();
private NacosRegistryServiceImpl() {
}
/**
* Gets instance.
*
* @return the instance
*/
static NacosRegistryServiceImpl getInstance() {
if (instance == null) {
synchronized (NacosRegistryServiceImpl.class) {
if (instance == null) {
instance = new NacosRegistryServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
getNamingInstance().registerInstance(getServiceName(), getServiceGroup(), address.getAddress().getHostAddress(), address.getPort(), getClusterName());
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
getNamingInstance().deregisterInstance(getServiceName(), getServiceGroup(), address.getAddress().getHostAddress(), address.getPort(), getClusterName());
}
@Override
public void subscribe(String cluster, EventListener listener) throws Exception {
List<String> clusters = new ArrayList<>();
clusters.add(cluster);
LISTENER_SERVICE_MAP.computeIfAbsent(cluster, key -> new ArrayList<>())
.add(listener);
getNamingInstance().subscribe(getServiceName(), getServiceGroup(), clusters, listener);
}
@Override
public void unsubscribe(String cluster, EventListener listener) throws Exception {
List<String> clusters = new ArrayList<>();
clusters.add(cluster);
List<EventListener> subscribeList = LISTENER_SERVICE_MAP.get(cluster);
if (subscribeList != null) {
List<EventListener> newSubscribeList = subscribeList.stream()
.filter(eventListener -> !eventListener.equals(listener))
.collect(Collectors.toList());
LISTENER_SERVICE_MAP.put(cluster, newSubscribeList);
}
getNamingInstance().unsubscribe(getServiceName(), getServiceGroup(), clusters, listener);
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
synchronized (LOCK_OBJ) {
if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
List<String> clusters = new ArrayList<>();
clusters.add(clusterName);
List<Instance> firstAllInstances = getNamingInstance().getAllInstances(getServiceName(), getServiceGroup(), clusters);
if (null != firstAllInstances) {
List<InetSocketAddress> newAddressList = firstAllInstances.stream()
.filter(instance -> instance.isEnabled() && instance.isHealthy())
.map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort()))
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
}
subscribe(clusterName, event -> {
List<Instance> instances = ((NamingEvent) event).getInstances();
if (null == instances && null != CLUSTER_ADDRESS_MAP.get(clusterName)) {
CLUSTER_ADDRESS_MAP.remove(clusterName);
} else if (!CollectionUtils.isEmpty(instances)) {
List<InetSocketAddress> newAddressList = instances.stream()
.filter(instance -> instance.isEnabled() && instance.isHealthy())
.map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort()))
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
}
});
}
}
}
return CLUSTER_ADDRESS_MAP.get(clusterName);
}
@Override
public void close() throws Exception {
}
/**
* Gets naming instance.
*
* @return the naming instance
* @throws Exception the exception
*/
public static NamingService getNamingInstance() throws Exception {
if (naming == null) {
synchronized (NacosRegistryServiceImpl.class) {
if (naming == null) {
naming = NacosFactory.createNamingService(getNamingProperties());
}
}
}
return naming;
}
private static Properties getNamingProperties() {
Properties properties = new Properties();
if (System.getProperty(PRO_SERVER_ADDR_KEY) != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, System.getProperty(PRO_SERVER_ADDR_KEY));
} else {
String address = FILE_CONFIG.getConfig(getNacosAddrFileKey());
if (address != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, address);
}
}
if (System.getProperty(PRO_NAMESPACE_KEY) != null) {
properties.setProperty(PRO_NAMESPACE_KEY, System.getProperty(PRO_NAMESPACE_KEY));
} else {
String namespace = FILE_CONFIG.getConfig(getNacosNameSpaceFileKey());
if (namespace == null) {
namespace = DEFAULT_NAMESPACE;
}
properties.setProperty(PRO_NAMESPACE_KEY, namespace);
}
String userName = StringUtils.isNotBlank(System.getProperty(USER_NAME)) ? System.getProperty(USER_NAME)
: FILE_CONFIG.getConfig(getNacosUserName());
if (StringUtils.isNotBlank(userName)) {
String password = StringUtils.isNotBlank(System.getProperty(PASSWORD)) ? System.getProperty(PASSWORD)
: FILE_CONFIG.getConfig(getNacosPassword());
if (StringUtils.isNotBlank(password)) {
properties.setProperty(USER_NAME, userName);
properties.setProperty(PASSWORD, password);
}
}
return properties;
}
private static String getClusterName() {
return FILE_CONFIG.getConfig(getNacosClusterFileKey(), DEFAULT_CLUSTER);
}
private static String getServiceName() {
return FILE_CONFIG.getConfig(getNacosApplicationFileKey(), DEFAULT_APPLICATION);
}
private static String getServiceGroup() {
return FILE_CONFIG.getConfig(getNacosApplicationGroupKey(), DEFAULT_GROUP);
}
private static String getNacosAddrFileKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_SERVER_ADDR_KEY);
}
private static String getNacosNameSpaceFileKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_NAMESPACE_KEY);
}
private static String getNacosClusterFileKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER);
}
private static String getNacosApplicationFileKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_APPLICATION_KEY);
}
private static String getNacosApplicationGroupKey() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_GROUP_KEY);
}
private static String getNacosUserName() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, USER_NAME);
}
private static String getNacosPassword() {
return String.join(ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR, ConfigurationKeys.FILE_ROOT_REGISTRY, REGISTRY_TYPE, PASSWORD);
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.nacos.NacosRegistryProvider

View File

@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-redis</artifactId>
<name>seata-discovery-redis ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,39 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.redis;
/**
* The RedisListener
*
* @author kl @kailing.pub
*/
public interface RedisListener {
/**
* The constant REGISTER.
*/
String REGISTER = "register";
/**
* The constant UN_REGISTER.
*/
String UN_REGISTER = "unregister";
/**
* use for redis event
*
* @param event the event
*/
void onEvent(String event);
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.redis;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryProvider;
import io.seata.discovery.registry.RegistryService;
/**
* @author xingfudeshi@gmail.com
*/
@LoadLevel(name = "Redis", order = 1)
public class RedisRegistryProvider implements RegistryProvider {
@Override
public RegistryService provide() {
return RedisRegistryServiceImpl.getInstance();
}
}

View File

@@ -0,0 +1,259 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.redis;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.stream.Collectors;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Protocol;
/**
* The type Redis registry service.
*
* @author kl @kailing.pub
*/
public class RedisRegistryServiceImpl implements RegistryService<RedisListener> {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisRegistryServiceImpl.class);
private static final String PRO_SERVER_ADDR_KEY = "serverAddr";
private static final String REDIS_FILEKEY_PREFIX = "registry.redis.";
private static final String DEFAULT_CLUSTER = "default";
private static final String REGISTRY_CLUSTER_KEY = "cluster";
private String clusterName;
private static final String REDIS_DB = "db";
private static final String REDIS_PASSWORD = "password";
private static final ConcurrentMap<String, List<RedisListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Set<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>();
private static volatile RedisRegistryServiceImpl instance;
private static volatile JedisPool jedisPool;
private ExecutorService threadPoolExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("RedisRegistryService", 1));
private RedisRegistryServiceImpl() {
Configuration seataConfig = ConfigurationFactory.CURRENT_FILE_INSTANCE;
this.clusterName = seataConfig.getConfig(REDIS_FILEKEY_PREFIX + REGISTRY_CLUSTER_KEY, DEFAULT_CLUSTER);
String password = seataConfig.getConfig(getRedisPasswordFileKey());
String serverAddr = seataConfig.getConfig(getRedisAddrFileKey());
String[] serverArr = serverAddr.split(":");
String host = serverArr[0];
int port = Integer.parseInt(serverArr[1]);
int db = seataConfig.getInt(getRedisDbFileKey());
GenericObjectPoolConfig redisConfig = new GenericObjectPoolConfig();
redisConfig.setTestOnBorrow(seataConfig.getBoolean(REDIS_FILEKEY_PREFIX + "test.on.borrow", true));
redisConfig.setTestOnReturn(seataConfig.getBoolean(REDIS_FILEKEY_PREFIX + "test.on.return", false));
redisConfig.setTestWhileIdle(seataConfig.getBoolean(REDIS_FILEKEY_PREFIX + "test.while.idle", false));
int maxIdle = seataConfig.getInt(REDIS_FILEKEY_PREFIX + "max.idle", 0);
if (maxIdle > 0) {
redisConfig.setMaxIdle(maxIdle);
}
int minIdle = seataConfig.getInt(REDIS_FILEKEY_PREFIX + "min.idle", 0);
if (minIdle > 0) {
redisConfig.setMinIdle(minIdle);
}
int maxActive = seataConfig.getInt(REDIS_FILEKEY_PREFIX + "max.active", 0);
if (maxActive > 0) {
redisConfig.setMaxTotal(maxActive);
}
int maxTotal = seataConfig.getInt(REDIS_FILEKEY_PREFIX + "max.total", 0);
if (maxTotal > 0) {
redisConfig.setMaxTotal(maxTotal);
}
int maxWait = seataConfig.getInt(REDIS_FILEKEY_PREFIX + "max.wait",
seataConfig.getInt(REDIS_FILEKEY_PREFIX + "timeout", 0));
if (maxWait > 0) {
redisConfig.setMaxWaitMillis(maxWait);
}
int numTestsPerEvictionRun = seataConfig.getInt(REDIS_FILEKEY_PREFIX + "num.tests.per.eviction.run", 0);
if (numTestsPerEvictionRun > 0) {
redisConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
}
int timeBetweenEvictionRunsMillis = seataConfig.getInt(
REDIS_FILEKEY_PREFIX + "time.between.eviction.runs.millis", 0);
if (timeBetweenEvictionRunsMillis > 0) {
redisConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
}
int minEvictableIdleTimeMillis = seataConfig.getInt(REDIS_FILEKEY_PREFIX + "min.evictable.idle.time.millis",
0);
if (minEvictableIdleTimeMillis > 0) {
redisConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
}
if (StringUtils.isNullOrEmpty(password)) {
jedisPool = new JedisPool(redisConfig, host, port, Protocol.DEFAULT_TIMEOUT, null, db);
} else {
jedisPool = new JedisPool(redisConfig, host, port, Protocol.DEFAULT_TIMEOUT, password, db);
}
}
/**
* Gets instance.
*
* @return the instance
*/
static RedisRegistryServiceImpl getInstance() {
if (instance == null) {
synchronized (RedisRegistryServiceImpl.class) {
if (instance == null) {
instance = new RedisRegistryServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) {
NetUtil.validAddress(address);
String serverAddr = NetUtil.toStringAddress(address);
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(getRedisRegistryKey(), serverAddr, ManagementFactory.getRuntimeMXBean().getName());
jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
}
}
@Override
public void unregister(InetSocketAddress address) {
NetUtil.validAddress(address);
String serverAddr = NetUtil.toStringAddress(address);
try (Jedis jedis = jedisPool.getResource()) {
jedis.hdel(getRedisRegistryKey(), serverAddr);
jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.UN_REGISTER);
}
}
@Override
public void subscribe(String cluster, RedisListener listener) {
String redisRegistryKey = REDIS_FILEKEY_PREFIX + cluster;
LISTENER_SERVICE_MAP.computeIfAbsent(cluster, key -> new ArrayList<>())
.add(listener);
threadPoolExecutor.submit(() -> {
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.subscribe(new NotifySub(LISTENER_SERVICE_MAP.get(cluster)), redisRegistryKey);
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
});
}
@Override
public void unsubscribe(String cluster, RedisListener listener) {
}
@Override
public List<InetSocketAddress> lookup(String key) {
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
String redisRegistryKey = REDIS_FILEKEY_PREFIX + clusterName;
Map<String, String> instances;
try (Jedis jedis = jedisPool.getResource()) {
instances = jedis.hgetAll(redisRegistryKey);
}
if (instances != null && !instances.isEmpty()) {
Set<InetSocketAddress> newAddressSet = instances.keySet().stream()
.map(NetUtil::toInetSocketAddress)
.collect(Collectors.toSet());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressSet);
}
subscribe(clusterName, msg -> {
String[] msgr = msg.split("-");
String serverAddr = msgr[0];
String eventType = msgr[1];
switch (eventType) {
case RedisListener.REGISTER:
CLUSTER_ADDRESS_MAP.get(clusterName).add(NetUtil.toInetSocketAddress(serverAddr));
break;
case RedisListener.UN_REGISTER:
CLUSTER_ADDRESS_MAP.get(clusterName).remove(NetUtil.toInetSocketAddress(serverAddr));
break;
default:
throw new ShouldNeverHappenException("unknown redis msg:" + msg);
}
});
}
return new ArrayList<>(CLUSTER_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptySet()));
}
@Override
public void close() throws Exception {
jedisPool.destroy();
}
private static class NotifySub extends JedisPubSub {
private final List<RedisListener> redisListeners;
/**
* Instantiates a new Notify sub.
*
* @param redisListeners the redis listeners
*/
NotifySub(List<RedisListener> redisListeners) {
this.redisListeners = redisListeners;
}
@Override
public void onMessage(String key, String msg) {
for (RedisListener listener : redisListeners) {
listener.onEvent(msg);
}
}
}
private String getRedisRegistryKey() {
return REDIS_FILEKEY_PREFIX + clusterName;
}
private String getRedisAddrFileKey() {
return REDIS_FILEKEY_PREFIX + PRO_SERVER_ADDR_KEY;
}
private String getRedisPasswordFileKey() {
return REDIS_FILEKEY_PREFIX + REDIS_PASSWORD;
}
private String getRedisDbFileKey() {
return REDIS_FILEKEY_PREFIX + REDIS_DB;
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.redis.RedisRegistryProvider

View File

@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-sofa</artifactId>
<name>seata-discovery-sofa ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-client-all</artifactId>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,32 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.sofa;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryProvider;
import io.seata.discovery.registry.RegistryService;
/**
* @author leizhiyuan
*/
@LoadLevel(name = "Sofa", order = 1)
public class SofaRegistryProvider implements RegistryProvider {
@Override
public RegistryService provide() {
return SofaRegistryServiceImpl.getInstance();
}
}

View File

@@ -0,0 +1,302 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.sofa;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.alipay.sofa.registry.client.api.RegistryClient;
import com.alipay.sofa.registry.client.api.RegistryClientConfig;
import com.alipay.sofa.registry.client.api.SubscriberDataObserver;
import com.alipay.sofa.registry.client.api.model.RegistryType;
import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration;
import com.alipay.sofa.registry.client.provider.DefaultRegistryClient;
import com.alipay.sofa.registry.client.provider.DefaultRegistryClientConfigBuilder;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import io.seata.common.util.NetUtil;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.apache.commons.lang.StringUtils;
import static io.seata.config.ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR;
import static io.seata.config.ConfigurationKeys.FILE_ROOT_REGISTRY;
/**
* The type SOFARegistry registry service.
*
* @author leizhiyuan
*/
public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataObserver> {
private static final String SOFA_FILEKEY_PREFIX = "registry.sofa.";
private static final String PRO_SERVER_ADDR_KEY = "serverAddr";
private static final String PRO_REGION_KEY = "region";
private static final String PRO_DATACENTER_KEY = "datacenter";
private static final String PRO_GROUP_KEY = "group";
private static final String PRO_APPLICATION_KEY = "application";
private static final String PRO_CLUSTER_KEY = "cluster";
private static final String PRO_ADDRESS_WAIT_TIME_KEY = "addressWaitTime";
private static final String DEFAULT_LOCAL_DATACENTER = "DefaultDataCenter";
private static final String DEFAULT_LOCAL_REGION = "DEFAULT_ZONE";
private static final String DEFAULT_GROUP = "SEATA_GROUP";
private static final String DEFAULT_APPLICATION = "default";
private static final String DEFAULT_CLUSTER = "default";
private static final String DEFAULT_ADDRESS_WAIT_TIME = "3000";
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static final String HOST_SEPERATOR = ":";
private static final String REGISTRY_TYPE = "sofa";
private static final ConcurrentMap<String, List<SubscriberDataObserver>> LISTENER_SERVICE_MAP
= new ConcurrentHashMap<>();
private static final ConcurrentMap<String, List<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>();
private static Properties registryProps;
private static volatile RegistryClient registryClient;
private static volatile SofaRegistryServiceImpl instance;
private SofaRegistryServiceImpl() {
}
/**
* Gets instance.
*
* @return the instance
*/
static SofaRegistryServiceImpl getInstance() {
if (instance == null) {
synchronized (SofaRegistryServiceImpl.class) {
if (instance == null) {
registryProps = getNamingProperties();
instance = new SofaRegistryServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
String clusterName = registryProps.getProperty(PRO_CLUSTER_KEY);
PublisherRegistration publisherRegistration = new PublisherRegistration(clusterName);
publisherRegistration.setGroup(registryProps.getProperty(PRO_GROUP_KEY));
String serviceData = address.getAddress().getHostAddress() + HOST_SEPERATOR + address.getPort();
getRegistryInstance().register(publisherRegistration, serviceData);
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
String clusterName = registryProps.getProperty(PRO_CLUSTER_KEY);
getRegistryInstance().unregister(clusterName, registryProps.getProperty(PRO_GROUP_KEY), RegistryType.PUBLISHER);
}
private RegistryClient getRegistryInstance() {
if (registryClient == null) {
synchronized (SofaRegistryServiceImpl.class) {
if (registryClient == null) {
String address = registryProps.getProperty(PRO_SERVER_ADDR_KEY);
final String portStr = StringUtils.substringAfter(address, HOST_SEPERATOR);
RegistryClientConfig config = DefaultRegistryClientConfigBuilder.start()
.setAppName(getApplicationName())
.setDataCenter(registryProps.getProperty(PRO_DATACENTER_KEY))
.setZone(registryProps.getProperty(PRO_REGION_KEY))
.setRegistryEndpoint(StringUtils.substringBefore(address, HOST_SEPERATOR))
.setRegistryEndpointPort(Integer.parseInt(portStr)).build();
DefaultRegistryClient result = new DefaultRegistryClient(config);
result.init();
registryClient = result;
}
}
}
return registryClient;
}
@Override
public void subscribe(String cluster, SubscriberDataObserver listener) throws Exception {
SubscriberRegistration subscriberRegistration = new SubscriberRegistration(cluster, listener);
subscriberRegistration.setScopeEnum(ScopeEnum.global);
subscriberRegistration.setGroup(registryProps.getProperty(PRO_GROUP_KEY));
LISTENER_SERVICE_MAP.computeIfAbsent(cluster, key -> new ArrayList<>())
.add(listener);
getRegistryInstance().register(subscriberRegistration);
}
@Override
public void unsubscribe(String cluster, SubscriberDataObserver listener) throws Exception {
getRegistryInstance().unregister(cluster, registryProps.getProperty(PRO_GROUP_KEY), RegistryType.SUBSCRIBER);
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
CountDownLatch respondRegistries = new CountDownLatch(1);
subscribe(clusterName, (dataId, data) -> {
Map<String, List<String>> instances = data.getZoneData();
if (instances == null && CLUSTER_ADDRESS_MAP.get(clusterName) != null) {
CLUSTER_ADDRESS_MAP.remove(clusterName);
} else {
List<InetSocketAddress> newAddressList = flatData(instances);
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
}
respondRegistries.countDown();
});
//wait max for first lookup
final String property = registryProps.getProperty(PRO_ADDRESS_WAIT_TIME_KEY);
respondRegistries.await(Integer.parseInt(property), TimeUnit.MILLISECONDS);
}
return CLUSTER_ADDRESS_MAP.get(clusterName);
}
private List<InetSocketAddress> flatData(Map<String, List<String>> instances) {
List<InetSocketAddress> result = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : instances.entrySet()) {
for (String str : entry.getValue()) {
String ip = StringUtils.substringBefore(str, HOST_SEPERATOR);
String port = StringUtils.substringAfter(str, HOST_SEPERATOR);
InetSocketAddress inetSocketAddress = new InetSocketAddress(ip, Integer.parseInt(port));
result.add(inetSocketAddress);
}
}
return result;
}
@Override
public void close() throws Exception {
}
private static Properties getNamingProperties() {
Properties properties = new Properties();
if (System.getProperty(SOFA_FILEKEY_PREFIX + PRO_SERVER_ADDR_KEY) != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, System.getProperty(SOFA_FILEKEY_PREFIX + PRO_SERVER_ADDR_KEY));
} else {
String address = FILE_CONFIG.getConfig(getSofaAddrFileKey());
if (address != null) {
properties.setProperty(PRO_SERVER_ADDR_KEY, address);
}
}
if (System.getProperty(SOFA_FILEKEY_PREFIX + PRO_REGION_KEY) != null) {
properties.setProperty(PRO_REGION_KEY, System.getProperty(SOFA_FILEKEY_PREFIX + PRO_REGION_KEY));
} else {
String region = FILE_CONFIG.getConfig(getSofaRegionFileKey());
if (region == null) {
region = DEFAULT_LOCAL_REGION;
}
properties.setProperty(PRO_REGION_KEY, region);
}
if (System.getProperty(SOFA_FILEKEY_PREFIX + PRO_DATACENTER_KEY) != null) {
properties.setProperty(PRO_DATACENTER_KEY, System.getProperty(SOFA_FILEKEY_PREFIX + PRO_DATACENTER_KEY));
} else {
String datacenter = FILE_CONFIG.getConfig(getSofaDataCenterFileKey());
if (datacenter == null) {
datacenter = DEFAULT_LOCAL_DATACENTER;
}
properties.setProperty(PRO_DATACENTER_KEY, datacenter);
}
if (System.getProperty(SOFA_FILEKEY_PREFIX + PRO_GROUP_KEY) != null) {
properties.setProperty(PRO_GROUP_KEY, System.getProperty(SOFA_FILEKEY_PREFIX + PRO_GROUP_KEY));
} else {
String group = FILE_CONFIG.getConfig(getSofaGroupFileKey());
if (group == null) {
group = DEFAULT_GROUP;
}
properties.setProperty(PRO_GROUP_KEY, group);
}
if (System.getProperty(SOFA_FILEKEY_PREFIX + PRO_CLUSTER_KEY) != null) {
properties.setProperty(PRO_CLUSTER_KEY, System.getProperty(SOFA_FILEKEY_PREFIX + PRO_CLUSTER_KEY));
} else {
String cluster = FILE_CONFIG.getConfig(getSofaClusterFileKey());
if (cluster == null) {
cluster = DEFAULT_CLUSTER;
}
properties.setProperty(PRO_CLUSTER_KEY, cluster);
}
if (System.getProperty(SOFA_FILEKEY_PREFIX + PRO_ADDRESS_WAIT_TIME_KEY) != null) {
properties.setProperty(PRO_ADDRESS_WAIT_TIME_KEY, System.getProperty(SOFA_FILEKEY_PREFIX + PRO_ADDRESS_WAIT_TIME_KEY));
} else {
String group = FILE_CONFIG.getConfig(getSofaAddressWaitTimeFileKey());
if (group == null) {
group = DEFAULT_ADDRESS_WAIT_TIME;
}
properties.setProperty(PRO_ADDRESS_WAIT_TIME_KEY, group);
}
return properties;
}
private static String getSofaClusterFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_CLUSTER_KEY);
}
private static String getSofaAddressWaitTimeFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_ADDRESS_WAIT_TIME_KEY);
}
private static String getSofaAddrFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_SERVER_ADDR_KEY);
}
private static String getSofaRegionFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_REGION_KEY);
}
private static String getSofaDataCenterFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_DATACENTER_KEY);
}
private static String getSofaGroupFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_GROUP_KEY);
}
private String getApplicationFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_APPLICATION_KEY);
}
private String getApplicationName() {
String application = FILE_CONFIG.getConfig(getApplicationFileKey());
if (application == null) {
application = DEFAULT_APPLICATION;
}
return application;
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.sofa.SofaRegistryProvider

View File

@@ -0,0 +1,113 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.sofa;
import com.alipay.sofa.registry.server.test.TestRegistryMain;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* The type SofaRegistryServiceImpl test.
*
* @author leizhiyuan
*/
public class SofaRegistryServiceImplTest {
private static TestRegistryMain registryMain;
@BeforeAll
public static void beforeClass() {
System.setProperty("serverAddr", "127.0.0.1:9603");
System.setProperty("addressWaitTime", "10000");
registryMain = new TestRegistryMain();
try {
registryMain.startRegistry();
} catch (Exception e) {
Assertions.fail("start sofaregistry fail");
}
}
@Test
public void testSofaRegistry() {
final InetSocketAddress address = new InetSocketAddress(1234);
final SofaRegistryServiceImpl instance = SofaRegistryServiceImpl.getInstance();
try {
instance.register(address);
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
//need sofa registry to sync data
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
List<InetSocketAddress> result = new ArrayList<>();
try {
result = instance.lookup("my_test_tx_group");
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
Assertions.assertTrue(result.size() > 0);
Assertions.assertEquals(address, result.get(0));
try {
instance.unregister(address);
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ignore) {
}
try {
result = instance.lookup("my_test_tx_group");
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
Assertions.assertEquals(0, result.size());
}
@AfterAll
public static void afterClass() {
System.setProperty("serverAddr", "");
System.setProperty("addressWaitTime", "0");
try {
registryMain.stopRegistry();
} catch (Exception ignore) {
//ignore
}
}
}

View File

@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 1999-2019 Seata.io Group.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.seata</groupId>
<artifactId>seata-discovery</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seata-discovery-zk</artifactId>
<name>seata-discovery-zk ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-discovery-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,312 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.zk;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.NetUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.seata.common.Constants.IP_PORT_SPLIT_CHAR;
/**
* zookeeper path as /registry/zk/
*
* @author crazier.huang
*/
public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildListener> {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperRegisterServiceImpl.class);
private static volatile ZookeeperRegisterServiceImpl instance;
private static volatile ZkClient zkClient;
private static final Configuration FILE_CONFIG = ConfigurationFactory.CURRENT_FILE_INSTANCE;
private static final String ZK_PATH_SPLIT_CHAR = "/";
private static final String FILE_ROOT_REGISTRY = "registry";
private static final String FILE_CONFIG_SPLIT_CHAR = ".";
private static final String REGISTRY_CLUSTER = "cluster";
private static final String REGISTRY_TYPE = "zk";
private static final String SERVER_ADDR_KEY = "serverAddr";
private static final String AUTH_USERNAME = "username";
private static final String AUTH_PASSWORD = "password";
private static final String SESSION_TIME_OUT_KEY = "sessionTimeout";
private static final String CONNECT_TIME_OUT_KEY = "connectTimeout";
private static final int DEFAULT_SESSION_TIMEOUT = 6000;
private static final int DEFAULT_CONNECT_TIMEOUT = 2000;
private static final String FILE_CONFIG_KEY_PREFIX = FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE
+ FILE_CONFIG_SPLIT_CHAR;
private static final String ROOT_PATH = ZK_PATH_SPLIT_CHAR + FILE_ROOT_REGISTRY + ZK_PATH_SPLIT_CHAR + REGISTRY_TYPE
+ ZK_PATH_SPLIT_CHAR;
private static final String ROOT_PATH_WITHOUT_SUFFIX = ZK_PATH_SPLIT_CHAR + FILE_ROOT_REGISTRY + ZK_PATH_SPLIT_CHAR
+ REGISTRY_TYPE;
private static final ConcurrentMap<String, List<InetSocketAddress>> CLUSTER_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, List<IZkChildListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
private static final int REGISTERED_PATH_SET_SIZE = 1;
private static final Set<String> REGISTERED_PATH_SET = Collections.synchronizedSet(new HashSet<>(REGISTERED_PATH_SET_SIZE));
private ZookeeperRegisterServiceImpl() {
}
static ZookeeperRegisterServiceImpl getInstance() {
if (instance == null) {
synchronized (ZookeeperRegisterServiceImpl.class) {
if (instance == null) {
instance = new ZookeeperRegisterServiceImpl();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
String path = getRegisterPathByPath(address);
doRegister(path);
}
private boolean doRegister(String path) {
if (checkExists(path)) {
return false;
}
createParentIfNotPresent(path);
getClientInstance().createEphemeral(path, true);
REGISTERED_PATH_SET.add(path);
return true;
}
private void createParentIfNotPresent(String path) {
int i = path.lastIndexOf('/');
if (i > 0) {
String parent = path.substring(0, i);
if (!checkExists(parent)) {
getClientInstance().createPersistent(parent);
}
}
}
private boolean checkExists(String path) {
return getClientInstance().exists(path);
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
String path = getRegisterPathByPath(address);
getClientInstance().delete(path);
REGISTERED_PATH_SET.remove(path);
}
@Override
public void subscribe(String cluster, IZkChildListener listener) throws Exception {
if (cluster == null) {
return;
}
String path = ROOT_PATH + cluster;
if (!getClientInstance().exists(path)) {
getClientInstance().createPersistent(path);
}
getClientInstance().subscribeChildChanges(path, listener);
LISTENER_SERVICE_MAP.computeIfAbsent(cluster, key -> new CopyOnWriteArrayList<>())
.add(listener);
}
@Override
public void unsubscribe(String cluster, IZkChildListener listener) throws Exception {
if (cluster == null) {
return;
}
String path = ROOT_PATH + cluster;
if (getClientInstance().exists(path)) {
getClientInstance().unsubscribeChildChanges(path, listener);
List<IZkChildListener> subscribeList = LISTENER_SERVICE_MAP.get(cluster);
if (subscribeList != null) {
List<IZkChildListener> newSubscribeList = subscribeList.stream()
.filter(eventListener -> !eventListener.equals(listener))
.collect(Collectors.toList());
LISTENER_SERVICE_MAP.put(cluster, newSubscribeList);
}
}
}
/**
* @param key the key
* @return
* @throws Exception
*/
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
if (clusterName == null) {
return null;
}
return doLookup(clusterName);
}
// visible for test.
List<InetSocketAddress> doLookup(String clusterName) throws Exception {
boolean exist = getClientInstance().exists(ROOT_PATH + clusterName);
if (!exist) {
return null;
}
if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {
List<String> childClusterPath = getClientInstance().getChildren(ROOT_PATH + clusterName);
refreshClusterAddressMap(clusterName, childClusterPath);
subscribeCluster(clusterName);
}
return CLUSTER_ADDRESS_MAP.get(clusterName);
}
@Override
public void close() throws Exception {
getClientInstance().close();
}
private ZkClient getClientInstance() {
if (zkClient == null) {
synchronized (ZookeeperRegisterServiceImpl.class) {
if (zkClient == null) {
zkClient = buildZkClient(FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + SERVER_ADDR_KEY),
FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + SESSION_TIME_OUT_KEY, DEFAULT_SESSION_TIMEOUT),
FILE_CONFIG.getInt(FILE_CONFIG_KEY_PREFIX + CONNECT_TIME_OUT_KEY, DEFAULT_CONNECT_TIMEOUT),
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_USERNAME),
FILE_CONFIG.getConfig(FILE_CONFIG_KEY_PREFIX + AUTH_PASSWORD));
}
}
}
return zkClient;
}
// visible for test.
ZkClient buildZkClient(String address, int sessionTimeout, int connectTimeout,String... authInfo) {
ZkClient zkClient = new ZkClient(address, sessionTimeout, connectTimeout);
if (authInfo != null && authInfo.length == 2) {
if (!StringUtils.isBlank(authInfo[0]) && !StringUtils.isBlank(authInfo[1])) {
StringBuilder auth = new StringBuilder(authInfo[0]).append(":").append(authInfo[1]);
zkClient.addAuthInfo("digest", auth.toString().getBytes());
}
}
if (!zkClient.exists(ROOT_PATH_WITHOUT_SUFFIX)) {
zkClient.createPersistent(ROOT_PATH_WITHOUT_SUFFIX, true);
}
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception {
//ignore
}
@Override
public void handleNewSession() throws Exception {
recover();
}
@Override
public void handleSessionEstablishmentError(Throwable throwable) throws Exception {
//ignore
}
});
return zkClient;
}
private void recover() throws Exception {
// recover Server
if (!REGISTERED_PATH_SET.isEmpty()) {
REGISTERED_PATH_SET.forEach(this::doRegister);
}
// recover client
if (!LISTENER_SERVICE_MAP.isEmpty()) {
Map<String, List<IZkChildListener>> listenerMap = new HashMap<>(LISTENER_SERVICE_MAP);
LISTENER_SERVICE_MAP.clear();
for (Map.Entry<String, List<IZkChildListener>> listenerEntry : listenerMap.entrySet()) {
List<IZkChildListener> iZkChildListeners = listenerEntry.getValue();
if (CollectionUtils.isEmpty(iZkChildListeners)) {
continue;
}
for (IZkChildListener listener : iZkChildListeners) {
subscribe(listenerEntry.getKey(), listener);
}
}
}
}
private void subscribeCluster(String cluster) throws Exception {
subscribe(cluster, (parentPath, currentChilds) -> {
String clusterName = parentPath.replace(ROOT_PATH, "");
if (CollectionUtils.isEmpty(currentChilds) && CLUSTER_ADDRESS_MAP.get(clusterName) != null) {
CLUSTER_ADDRESS_MAP.remove(clusterName);
} else if (!CollectionUtils.isEmpty(currentChilds)) {
refreshClusterAddressMap(clusterName, currentChilds);
}
});
}
private void refreshClusterAddressMap(String clusterName, List<String> instances) {
List<InetSocketAddress> newAddressList = new ArrayList<>();
if (instances == null) {
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
return;
}
for (String path : instances) {
try {
String[] ipAndPort = path.split(IP_PORT_SPLIT_CHAR);
newAddressList.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));
} catch (Exception e) {
LOGGER.warn("The cluster instance info is error, instance info:{}", path);
}
}
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
}
private String getClusterName() {
String clusterConfigName = String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_CLUSTER);
return FILE_CONFIG.getConfig(clusterConfigName);
}
private String getRegisterPathByPath(InetSocketAddress address) {
return ROOT_PATH + getClusterName() + ZK_PATH_SPLIT_CHAR + NetUtil.toStringAddress(address);
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.zk;
import io.seata.common.loader.LoadLevel;
import io.seata.discovery.registry.RegistryProvider;
import io.seata.discovery.registry.RegistryService;
/**
* @author xingfudeshi@gmail.com
*/
@LoadLevel(name = "ZK", order = 1)
public class ZookeeperRegistryProvider implements RegistryProvider {
@Override
public RegistryService provide() {
return ZookeeperRegisterServiceImpl.getInstance();
}
}

View File

@@ -0,0 +1 @@
io.seata.discovery.registry.zk.ZookeeperRegistryProvider

View File

@@ -0,0 +1,99 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.zk;
import io.seata.common.util.NetUtil;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.test.TestingServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author Geng Zhang
*/
public class ZookeeperRegisterServiceImplTest {
protected static TestingServer server = null;
@BeforeAll
public static void adBeforeClass() throws Exception {
server = new TestingServer(2181, true);
server.start();
}
@AfterAll
public static void adAfterClass() throws Exception {
if (server != null) {
server.stop();
}
}
ZookeeperRegisterServiceImpl service = (ZookeeperRegisterServiceImpl) new ZookeeperRegistryProvider().provide();
@Test
public void getInstance() {
ZookeeperRegisterServiceImpl service1 = ZookeeperRegisterServiceImpl.getInstance();
Assertions.assertEquals(service1, service);
}
@Test
public void buildZkTest() {
ZkClient client = service.buildZkClient("127.0.0.1:2181", 5000, 5000);
Assertions.assertTrue(client.exists("/zookeeper"));
}
@Test
public void testAll() throws Exception {
service.register(new InetSocketAddress(NetUtil.getLocalAddress(), 33333));
Assertions.assertNull(service.lookup("xxx"));
List<InetSocketAddress> lookup2 = service.doLookup("default");
Assertions.assertEquals(1, lookup2.size());
final List<String> data = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);
IZkChildListener listener = (s, list) -> {
data.clear();
data.addAll(list);
latch.countDown();
};
service.subscribe("default", listener);
final CountDownLatch latch2 = new CountDownLatch(1);
final List<String> data2 = new ArrayList<>();
IZkChildListener listener2 = (s, list) -> {
data2.clear();
data2.addAll(list);
latch2.countDown();
};
service.subscribe("default", listener2);
service.unregister(new InetSocketAddress(NetUtil.getLocalAddress(), 33333));
latch2.await(1000, TimeUnit.MILLISECONDS);
Assertions.assertEquals(0, data2.size());
service.unsubscribe("default", listener);
service.unsubscribe("default", listener2);
}
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.discovery.registry.zk;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
/**
* @author Geng Zhang
*/
public class ZookeeperRegistryProviderTest {
@Test
public void provide() {
ZookeeperRegistryProvider provider = new ZookeeperRegistryProvider();
Assertions.assertTrue(provider.provide() instanceof ZookeeperRegisterServiceImpl);
}
}