Apache Ignite 学习笔记(五): Primary和backup数据同步模式和处理分片丢失的策略

上一篇文章我们介绍了Ignite数据网格中不同的数据分片冗余策略:Replicated和Partition模式。无论是哪种模式,其实就是通过对数据分片在不同的节点上做多个拷贝来保证数据的可用性。在一个多个节点组成的分布式系统中,一旦需要做数据拷贝,自然就要考虑数据拷贝的过程是同步的还是异步的。而且,在partition模式下,一个节点也许不会有数据的所有分片,那势必会出现某个数据分片的primar...

Apache Ignite 学习笔记(五): Primary和backup数据同步模式和处理分片丢失的策略

上一篇文章我们介绍了Ignite数据网格中不同的数据分片冗余策略:Replicated和Partition模式。无论是哪种模式,其实就是通过对数据分片在不同的节点上做多个拷贝来保证数据的可用性。在一个多个节点组成的分布式系统中,一旦需要做数据拷贝,自然就要考虑数据拷贝的过程是同步的还是异步的。而且,在partition模式下,一个节点也许不会有数据的所有分片,那势必会出现某个数据分片的primary和backup拷贝由于节点故障,在集群中访问不到的情况。这篇文章我们就接着看看,针对数据拷贝以及数据分片丢失,Ignite提供了哪些选项,我们又该怎样处理。

Primary和Backup之间的同步/异步拷贝


Ignite针对primary和backup之间的数据拷贝提供了三种同步模式:

  • PRIMARY_SYNC: 默认情况下Ignite采用的同步模式。写cache的操作在数据分片的primary节点成功写入即可返回,不用等待backup节点数据成功写入。这也意味着,如果此时从backup节点读数据,有可能读到的任然是旧数据。
  • FULL_SYNC: 写cache的操作在primary节点和backup节点都成功写入后返回。和PRIMARY_SYNC模式相比,这个模式保证了写入成功后节点之间的数据都一样。
  • FULL_ASYNC: 写cache的操作不用等primary节点和backup节点成功写入即可返回。和PRIMARY_SYNC模式相比,此时即便是读primary节点的数据都有可能读到旧数据。

三种同步模式如何选择,完全取决于应用对数据一致性,可用性和性能的要求。FULL_SYNC保证新的数据同步到了primary和backup节点上,自然对写操作的性能影响是最大的。PRIMARY_SYNC则只保证数据同步到了primary节点上,这个模式牺牲一定的可用性换取了比FULL_SYNC更好的写性能。而FULL_ASYNC因为是完全异步的,所以有可能会出现数据丢失,这里牺牲了数据的可用性,换取更好的写性能。

我们可以通过XML配置文件或者是代码中之间配置同步模式。下面是XML配置文件:

<beans xmlns=“http://www.springframework.org/schema/beans“ xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance“ xsi:schemaLocation=“ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd“> <bean id=“grid.cfg“ class=“org.apache.ignite.configuration.IgniteConfiguration“>  <property name=“discoverySpi“><bean class=“org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi“> <property name=“ipFinder“>  <bean class=“org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder“><property name=“multicastGroup“ value=“224.0.0.251“/>  </bean> </property></bean>  </property>  <property name=“cacheConfiguration“><bean class=“org.apache.ignite.configuration.CacheConfiguration“> <!-- 设置缓存名字. --> <property name=“name“ value=“TEST“/> <!-- 设置缓存模式. --> <property name=“cacheMode“ value=“PARTITIONED“/> <property name=“backups“ value=“1“/> <!-- 下面将缓存设置为replicated模式 --> <!--property name=“cacheMode“ value=“REPLICATED“/--> <property name=“writeSynchronizationMode“ value=“FULL_SYNC“/></bean>  </property> </bean></beans>

下面例子是如何在Java代码中设置同步模式

 ... CacheConfiguration<String, String> cacheCfg = new CacheConfiguration(“TEST“); cacheCfg.setCacheMode(CacheMode.PARTITIONED); cacheCfg.setBackups(1); cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); ...

数据分片丢失的处理


在partition模式下, 数据分片后存放在primary和backup节点上,一旦出现某块数据分片的所有primary和backup拷贝由于节点故障无法访问时,就出现了“partition loss“的情况。用上一篇文章的partitoned cached图来举个例子:

图中cache用的模式partition模式,backup数量是1,所以数据分片有一个primar和backup拷贝,如果JVM1和JVM4出现故障,那么分片D的primary拷贝和backup拷贝全都无法访问。这时候,如果允许用户的读写操作继续读取分片D数据,那数据的一致性就无法保证了。我们可以通过监听EVT_CACHE_REBALANCE_PART_DATA_LOST事件,及时知道集群中出现partition loss,然后采取相应措施。另外,Ignite提供了不同的处理策略,让你可以针对不同的场景选择不同的策略:

  • IGNORE: 如果不进行配置,这是默认情况下的策略。即使出现了partition loss的情况,Ignite会自动忽略并且会清空和partion loss相关的状态不会触发EVT_CACHE_REBALANCE_PART_DATA_LOST事件。
  • READ_WRITE_ALL: Ignite允许所有的读写操作,就好像partition loss没发生过。和IGNORE策略最大的不同,该策略虽然允许继续读写,但会触发EVT_CACHE_REBALANCE_PART_DATA_LOST事件。
  • READ_WRITE_SAFE: 允许对没有丢失的partition的读写操作,但是对已经丢失的partition的读写操作会失败并抛异常。
  • READ_ONLY_ALL: 允许对丢失的和正常的partition的读操作,但是写操作会失败并抛异常。
  • READ_ONLY_SAFE: 所有的写操作和对丢失partition的读操作都会失败并抛异常。允许对正常的partition的读操作。

下面,让我们用一个例子演示下如何配置partition loss的策略,以及如何通过监听EVT_CACHE_REBALANCE_PART_DATA_LOST处理paritition loss的事件:

  1. 启动2个server实例。
  2. server实例启动后,启动一个client节点连上集群,用CacheMode.PARTITIONED模式创建一个backup数量为0的cache(将backup数量设为0为了方便模拟partition丢失的场景), 然后往cache里写一些数据,并监听EVT_CACHE_REBALANCE_PART_DATA_LOST事件。
  3. 关掉一个server实例模拟节点故障触发partition loss。
  4. 观察client实例能否收到EVT_CACHE_REBALANCE_PART_DATA_LOST事件,在发生partition loss后启用不同策略继续读写cache的行为,以及如何重置集群状态让读写恢复正常。

因为server节点的逻辑很简单(实际上2个server节点就是启动后组成一个Ignite集群),我们看看client节点的代码:

public class IgnitePartitionLossExampleClient { private static AtomicBoolean partitionLost = new AtomicBoolean(false); public static void main(String[] args) {  Ignite ignite;  if (args.length == 1 && !args[0].isEmpty()) {//如果启动时指定了配置文件,则用指定的配置文件System.out.println(“Use “args[0]“ to start.“);ignite = Ignition.start(args[0]);  } else {//如果启动时没指定配置文件,则生成一个配置文件System.out.println(“Create an IgniteConfiguration to start.“);TcpDiscoverySpi spi = new TcpDiscoverySpi();TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();ipFinder.setMulticastGroup(“224.0.0.251“);spi.setIpFinder(ipFinder);IgniteConfiguration cfg = new IgniteConfiguration();cfg.setDiscoverySpi(spi);cfg.setClientMode(true);//默认由于性能原因,Ignite会忽略所有事件,这里要主动配置需要监听的事件cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);ignite = Ignition.start(cfg);  }  // 创建一个TEST缓存, cache mode设为PARTITIONED, backup数量为1, 并把partition loss policy设为READ_WRITE_SAFE  CacheConfiguration<String, String> cacheCfg = new CacheConfiguration<>();  cacheCfg.setName(“TEST“);  cacheCfg.setCacheMode(CacheMode.PARTITIONED);  cacheCfg.setBackups(0);  cacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);  IgniteCache<String, String> cityProvinceCache = ignite.getOrCreateCache(cacheCfg);  // Local listener that listens to local events.  IgnitePredicate<CacheRebalancingEvent> locLsnr = evt -> {try { System.out.println(“=========Received event [evt=“evt.name()“]==========“); Collection<Integer> lostPartitions = cityProvinceCache.lostPartitions(); if (lostPartitions != null) {  partitionLost.set(true); } return true; // Continue listening.} catch (Exception e) { System.out.println(e);}System.out.println(“=========Stop listening==========“);return false;  };  // Subscribe to specified cache events occuring on local node.  ignite.events().localListen(locLsnr, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);  List<String> cities = new ArrayList<String>(Arrays.asList(“Edmonton“, “Calgary“, “Markham“, “Toronto“, “Richmond Hill“, “Montreal“));  // 写入一些数据, key是城市的名字,value是省的名字  populateCityProvinceData(cityProvinceCache);  //用下面的while循环不停模拟对cache的读写操作  while(true) {try { for (String city : cities) {  try { if (!partitionLost.get()) {  //如果cache一切正常,则正常读  getAndPrintCityProvince(city, cityProvinceCache); } else {  //如果cache出现partition lost
源文地址:https://www.guoxiongfei.cn/cntech/19297.html