1 Ekim 2021 Cuma

Apache Ignite IgniteCache Arayüzü - Data Grid API İçindir

Örnek
IgniteConfiguration şöyle olsun
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;

public class IgniteConfigurationManager {

  public static IgniteConfiguration getConfiguration() {
    IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
    igniteConfiguration.setIgniteInstanceName("ItemsGrid");
    igniteConfiguration.setPeerClassLoadingEnabled(true);
    igniteConfiguration.setLocalHost("127.0.0.1");
    TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
    TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
    ipFinder.setAddresses(Collections.singletonList("127.0.0.1:47500..47509"));
    tcpDiscoverySpi.setIpFinder(ipFinder);
    tcpDiscoverySpi.setLocalPortRange(9);
    igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
    TcpCommunicationSpi communicationSpi = new TcpCommunicationSpi();
    communicationSpi.setLocalAddress("localhost");
    communicationSpi.setLocalPort(48100);
    communicationSpi.setSlowClientQueueLimit(1000);
    igniteConfiguration.setCommunicationSpi(communicationSpi);
    igniteConfiguration.setCacheConfiguration(cacheConfiguration());
    return igniteConfiguration;
  }

  private static CacheConfiguration[] cacheConfiguration() {
    List<CacheConfiguration> cacheConfigurations = new ArrayList<>();
    CacheConfiguration<Long, Item> cacheConfiguration = new CacheConfiguration();
    cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    cacheConfiguration.setCacheMode(CacheMode.REPLICATED);
    cacheConfiguration.setName(Constants.ITEM_CACHE_NAME);
    cacheConfiguration.setWriteThrough(false);
    cacheConfiguration.setReadThrough(false);
    cacheConfiguration.setWriteBehindEnabled(false);
    cacheConfiguration.setBackups(1);
    cacheConfiguration.setStatisticsEnabled(true);
    cacheConfiguration.setIndexedTypes(Long.class, Item.class);
    cacheConfigurations.add(cacheConfiguration);
  
    return cacheConfigurations.toArray(new CacheConfiguration[cacheConfigurations.size()]);
  }
}
Item sınıfı şöyle olsun
import org.apache.ignite.cache.query.annotations.QuerySqlField;

@Entity
@Table(name = "items")
public class Item {
  @QuerySqlField(index = true)
  @Id
  @GeneratedValue(strategy = GenerationType.AUTO)
  private long id;
  
  @QuerySqlField(index = true)
  @Column(name = "name", nullable = false)
  private String name;
  
  @Column(name = "sku", nullable = false)
  private String sku;
  
  @QuerySqlField
  @JsonInclude()
  @Transient
  private int stockCount;
  
  @QuerySqlField
  @Column(name = "price", nullable = false)
  private double price;
}
Şöyle yaparız. Ignite Cache' transaction ile yazma yapılıyor
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;


@Service
@Log4j
public class IgniteService {

  @Autowired
  private Ignite ignite;

  public void addOrUpdateItemCache(Item item) {
    try (Transaction tx = ignite.transactions() .txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED)) {
      IgniteCache<Long, Item> cache = ignite.cache(Constants.ITEM_CACHE_NAME);
      cache.put(item.getId(), item);
      tx.commit();
      log.debug("Item Cache size after update:" + cache.size());
    }
  }

  public void addItemList(List<Item> items) {
    try (Transaction tx = ignite.transactions() .txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED)) {
      IgniteCache<Long, Item> cache = ignite.cache(Constants.ITEM_CACHE_NAME);
      items.forEach((item) -> cache.put(item.getId(), item));
      tx.commit();
      log.info("[Item Cache size after update:" + cache.size());
    }
  }
}
Sorgulama için şöyle yaparız
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

@Service
@Log4j
public class IgniteCacheServiceImpl implements IgniteCacheService {

  @Autowired
  private Ignite ignite;

  @Override
  public Item getItemFromName(String itemName) throws ResourceNotFoundException {
    IgniteCache<Long, Item> cache = ignite.cache(Constants.ITEM_CACHE_NAME);
    log.debug("Cache size:" + cache.size());
    SqlFieldsQuery sql = new SqlFieldsQuery("SELECT id,price,stockCount FROM item WHERE name = ?").setArgs(itemName);
    try (QueryCursor<List<?>> cursor = cache.query(sql)) {
        for (List<?> row : cursor) {
            Object idObject = row.get(0);
            if (idObject instanceof Number) {
                Item item = new Item();
                item.setId(Long.valueOf(idObject.toString()));
                item.setPrice(Double.valueOf(row.get(1).toString()));
                item.setStockCount(Integer.valueOf(row.get(2).toString()));
                item.setName(itemName);
                log.debug("item found= " + item);
                return item;
            }
        }
    }
    throw new ResourceNotFoundException("Item with the name " + itemName + " not found");
  }

  @Override
  public void updateStockCount(Map<Item, Integer> itemAndQuantityMap) throws OutOfStockException {
    try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED)) {
      IgniteCache<Long, Item> cache = ignite.cache(Constants.ITEM_CACHE_NAME);
      for (Map.Entry<Item, Integer> itemAndQuantity : itemAndQuantityMap.entrySet()) {
          Item cachedItem = cache.get(itemAndQuantity.getKey().getId());
          if (cachedItem != null) {
              if (cachedItem.getStockCount() < itemAndQuantity.getValue()) {
                  throw new OutOfStockException("Item out of stock: " + cachedItem.getName());
              }
              cachedItem.setStockCount(cachedItem.getStockCount() - itemAndQuantity.getValue());
              cache.put(itemAndQuantity.getKey().getId(), cachedItem);
          }
      }
      tx.commit();
      log.debug("Item Cache size after update:" + cache.size());
    
  }
}

Hiç yorum yok:

Yorum Gönder