diff --git a/.github/.DS_Store b/.github/.DS_Store new file mode 100644 index 0000000..78f47d4 Binary files /dev/null and b/.github/.DS_Store differ diff --git a/.github/ISSUE_TEMPLATE/bug-report-template.md b/.github/ISSUE_TEMPLATE/bug-report-template.md new file mode 100644 index 0000000..7318888 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug-report-template.md @@ -0,0 +1,22 @@ +--- +name: Bug Report Template +about: '버그 리포트 이슈 템플릿 ' +title: '' +labels: '' +assignees: '' + +--- + +## 어떤 버그인가요? + +> 어떤 버그인지 간결하게 설명해주세요 + +## 어떤 상황에서 발생한 버그인가요? + +> (가능하면) Given-When-Then 형식으로 서술해주세요 + +## 예상 결과 + +> 예상했던 정상적인 결과가 어떤 것이었는지 설명해주세요 + +## 참고할만한 자료(선택) \ No newline at end of file diff --git a/.github/ISSUE_TEMPLATE/feature-template.md b/.github/ISSUE_TEMPLATE/feature-template.md new file mode 100644 index 0000000..8e3136d --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature-template.md @@ -0,0 +1,18 @@ +--- +name: feat issue template +about: for new features +title: "[Feat]" +labels: feat +assignees: '' + +--- + +## 어떤 기능인가요? +> 추가하려는 기능에 대해 간결하게 설명해주세요 + +## 작업 상세 내용 +- [ ] TODO +- [ ] TODO +- [ ] TODO + +## 참고할만한 자료(선택) \ No newline at end of file diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..c74c7aa --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,15 @@ +## #️⃣연관된 이슈 + +> ex) #이슈번호, #이슈번호 + +## 📝작업 내용 + +> 이번 PR에서 작업한 내용을 간략히 설명해주세요(이미지 첨부 가능) + +### 스크린샷 (선택) + +## 💬리뷰 요구사항(선택) + +> 리뷰어가 특별히 봐주었으면 하는 부분이 있다면 작성해주세요 +> +> ex) 메서드 XXX의 이름을 더 잘 짓고 싶은데 혹시 좋은 명칭이 있을까요? \ No newline at end of file diff --git a/.github/workflows/cd-workflow.yml b/.github/workflows/cd-workflow.yml new file mode 100644 index 0000000..6dbe428 --- /dev/null +++ b/.github/workflows/cd-workflow.yml @@ -0,0 +1,37 @@ +name: CD with Gradle and Docker + +on: + push: + branches: + - 'main' + pull_request: + branches: + - 'main' + +permissions: + contents: read + +jobs: + build-and-deploy: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: ☕️ Set up JDK 17 + uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + - name: Grant execute permission for Gradlew + run: chmod +x ./gradlew + + - name: 🐘 Build with Gradle + run: ./gradlew clean build -x test --stacktrace + + - name: 💣 Build and Push Docker Image + run: | + docker login -u ${{ secrets.DOCKER_USERNAME }} -p ${{ secrets.DOCKER_PASSWORD }} + docker build -t ${{ secrets.DOCKER_USERNAME }}/memcached_server:latest . + docker push ${{ secrets.DOCKER_USERNAME }}/memcached_server:latest \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..348e14e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM gradle:8.5-jdk17 AS builder +WORKDIR /app +COPY . . +RUN gradle bootJar --no-daemon --build-cache + +FROM openjdk:17 +WORKDIR /app +COPY --from=builder /app/build/libs/*.jar app.jar +ENTRYPOINT ["java", "-jar", "/app/app.jar"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..78b0259 --- /dev/null +++ b/README.md @@ -0,0 +1,244 @@ +# Distributed Memcached Server + +분산 캐시 시스템의 Spring Boot 클라이언트 애플리케이션입니다. Consistent Hashing 알고리즘을 사용하여 여러 Memcached 노드에 데이터를 분산 저장하고, Apache Zookeeper를 통해 노드 상태를 관리합니다. + +## 시스템 아키텍처 + +### 주요 구성 요소 + +1. **ZookeeperConfig**: Zookeeper 연결 설정 및 Bean 등록 +2. **ZookeeperConnection**: Zookeeper 클라이언트 초기화 및 이벤트 핸들러 등록 +3. **ZKEventHandler**: Zookeeper 노드 변화 이벤트 처리 +4. **HashingManager**: Consistent Hashing 기반 노드 분산 관리 +5. **CacheManager**: 캐시 데이터 저장/조회/삭제 인터페이스 + +### 기능 + +- **분산 캐시 저장**: Consistent Hashing을 통한 효율적인 데이터 분산 +- **동적 노드 관리**: Zookeeper를 통한 실시간 노드 추가/제거 감지 +- **자동 리밸런싱**: 노드 변화 시 해시 링 자동 재구성 +- **REST API**: HTTP 기반 캐시 조작 인터페이스 + +## 사전 요구사항 + +- Java 17 이상 +- Apache Zookeeper 3.x +- Memcached 1.4 이상 +- Docker (Memcached 노드 실행용) + +## 설치 및 실행 + +### 1. Zookeeper 설치 및 실행 + +```bash +# Zookeeper 다운로드 및 실행 +wget https://downloads.apache.org/zookeeper/zookeeper-3.8.3/apache-zookeeper-3.8.3-bin.tar.gz +tar -xzf apache-zookeeper-3.8.3-bin.tar.gz +cd apache-zookeeper-3.8.3-bin + +# 설정 파일 복사 +cp conf/zoo_sample.cfg conf/zoo.cfg + +# Zookeeper 실행 +bin/zkServer.sh start +``` + +### 2. Memcached 노드 실행 (Docker) + +```bash +# 첫 번째 Memcached 노드 +docker run -d --name memcached1 -p 11211:11211 memcached:latest + +# 두 번째 Memcached 노드 +docker run -d --name memcached2 -p 11212:11211 memcached:latest + +# 세 번째 Memcached 노드 +docker run -d --name memcached3 -p 11213:11211 memcached:latest +``` + +### 3. Zookeeper에 노드 등록 + +```bash +# Zookeeper CLI 접속 +bin/zkCli.sh + +# Memcached 경로 생성 +create /memcached "" + +# 노드 등록 (데이터에 host:port 형식으로) +create /memcached/node1 "127.0.0.1:11211" +create /memcached/node2 "127.0.0.1:11212" +create /memcached/node3 "127.0.0.1:11213" +``` + +### 4. Spring Boot 애플리케이션 실행 + +```bash +# 프로젝트 빌드 및 실행 +./gradlew bootRun +``` + +## API 사용법 + +### 데이터 저장 + +```bash +curl -X POST http://localhost:8080/api/cache/mykey \ + -H "Content-Type: application/json" \ + -d '"Hello, World!"' +``` + +### 데이터 조회 + +```bash +curl -X GET http://localhost:8080/api/cache/mykey +``` + +### 데이터 삭제 + +```bash +curl -X DELETE http://localhost:8080/api/cache/mykey +``` + +### 캐시 상태 확인 + +```bash +curl -X GET http://localhost:8080/api/cache/status +``` + +### 헬스 체크 + +```bash +curl -X GET http://localhost:8080/api/cache/health +``` + +## 설정 파일 + +`src/main/resources/application.yml`: + +```yaml +# Zookeeper Configuration +zookeeper: + address: localhost:2181 + +memcached: + path: /memcached + +# Server Configuration +server: + port: 8080 + +# Logging Configuration +logging: + level: + com.DSC.distributedCache: INFO + org.apache.curator: WARN + net.spy.memcached: WARN + +# Application Configuration +spring: + application: + name: distributed-cache-server + +# Health Check Configuration +management: + endpoints: + web: + exposure: + include: health,info + endpoint: + health: + show-details: always +``` + +## 시퀀스 다이어그램 + +### 데이터 저장 시퀀스 +1. Client → CacheManager: addData(key, value) +2. CacheManager → HashingManager: getNode(key) +3. HashingManager → CacheManager: MemcachedClient +4. CacheManager → MemcachedClient: set(key, expire, value) +5. MemcachedClient → CacheManager: isAdded + +### 데이터 조회 시퀀스 +1. Client → CacheManager: getData(key) +2. CacheManager → HashingManager: getNode(key) +3. HashingManager → CacheManager: MemcachedClient +4. CacheManager → MemcachedClient: get(key) +5. MemcachedClient → CacheManager: data + +### 노드 변화 처리 시퀀스 +1. Zookeeper → ZKEventHandler: childEvent() +2. ZKEventHandler → HashingManager: addNode() or removeNode() +3. HashingManager: 해시 링 재구성 + +## 테스트 + +### 노드 동적 추가 테스트 + +```bash +# 새 Memcached 노드 실행 +docker run -d --name memcached4 -p 11214:11211 memcached:latest + +# Zookeeper에 노드 추가 +bin/zkCli.sh +create /memcached/node4 "127.0.0.1:11214" +``` + +### 노드 제거 테스트 + +```bash +# Zookeeper에서 노드 제거 +bin/zkCli.sh +delete /memcached/node4 + +# Memcached 컨테이너 중지 +docker stop memcached4 +``` + +## 트러블슈팅 + +### Zookeeper 연결 실패 +- Zookeeper 서버 상태 확인: `bin/zkServer.sh status` +- 방화벽 설정 확인 +- 포트 2181 사용 가능 여부 확인 + +### Memcached 연결 실패 +- Docker 컨테이너 상태 확인: `docker ps` +- 포트 바인딩 확인 +- Memcached 로그 확인: `docker logs ` + +### 해시 링 불일치 +- 애플리케이션 재시작 +- Zookeeper 노드 등록 상태 확인: `ls /memcached` + +## 개발자 가이드 + +### 패키지 구조 + +``` +com.DSC.distributedCache/ +├── config/ +│ └── ZookeeperConfig.java +├── zookeeper/ +│ ├── ZookeeperConnection.java +│ └── ZKEventHandler.java +├── hashing/ +│ └── HashingManager.java +├── cache/ +│ └── CacheManager.java +├── controller/ +│ └── CacheController.java +└── DistributedCacheApplication.java +``` + +### 확장 포인트 + +1. **다른 해싱 알고리즘**: `HashingManager.hashing()` 메서드 수정 +2. **추가 캐시 백엔드**: `CacheManager`에 새로운 클라이언트 추가 +3. **모니터링**: Spring Actuator를 통한 메트릭 추가 +4. **보안**: Spring Security를 통한 인증/인가 추가 + +## 라이선스 + +이 프로젝트는 MIT 라이선스를 따릅니다. \ No newline at end of file diff --git a/build.gradle b/build.gradle index 2cbb27e..8f73942 100644 --- a/build.gradle +++ b/build.gradle @@ -26,6 +26,17 @@ repositories { dependencies { implementation 'org.springframework.boot:spring-boot-starter-thymeleaf' implementation 'org.springframework.boot:spring-boot-starter-web' + + // Zookeeper dependencies + implementation 'org.apache.curator:curator-framework:5.4.0' + implementation 'org.apache.curator:curator-recipes:5.4.0' + + // Memcached client + implementation 'net.spy:spymemcached:2.12.3' + + // For consistent hashing + implementation 'com.google.guava:guava:32.1.2-jre' + compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' diff --git a/src/main/java/com/DSC/distributedCache/cache/CacheManager.java b/src/main/java/com/DSC/distributedCache/cache/CacheManager.java new file mode 100644 index 0000000..687740c --- /dev/null +++ b/src/main/java/com/DSC/distributedCache/cache/CacheManager.java @@ -0,0 +1,134 @@ +package com.DSC.distributedCache.cache; + +import com.DSC.distributedCache.hashing.HashingManager; +import net.spy.memcached.MemcachedClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Service +public class CacheManager { + + @Autowired + private HashingManager hashingManager; + + private static final int DEFAULT_EXPIRY = 3600; // 1 hour in seconds + private static final int OPERATION_TIMEOUT = 5; // 5 seconds timeout + + /** + * Add data to the distributed cache + * @param key Cache key + * @param value Value to store + * @return true if successfully added, false otherwise + */ + public boolean addData(String key, T value) { + return addData(key, value, DEFAULT_EXPIRY); + } + + /** + * Add data to the distributed cache with custom expiry + * @param key Cache key + * @param value Value to store + * @param expire Expiry time in seconds + * @return true if successfully added, false otherwise + */ + public boolean addData(String key, T value, int expire) { + try { + MemcachedClient client = hashingManager.getNode(key); + if (client == null) { + System.err.println("No available Memcached nodes for key: " + key); + return false; + } + + Future future = client.set(key, expire, value); + Boolean result = future.get(OPERATION_TIMEOUT, TimeUnit.SECONDS); + + System.out.println("Added data for key: " + key + ", result: " + result); + return result != null && result; + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + System.err.println("Failed to add data for key: " + key + ", error: " + e.getMessage()); + return false; + } + } + + /** + * Remove data from the distributed cache + * @param key Cache key + * @return true if successfully removed, false otherwise + */ + public boolean removeData(String key) { + try { + MemcachedClient client = hashingManager.getNode(key); + if (client == null) { + System.err.println("No available Memcached nodes for key: " + key); + return false; + } + + Future future = client.delete(key); + Boolean result = future.get(OPERATION_TIMEOUT, TimeUnit.SECONDS); + + System.out.println("Removed data for key: " + key + ", result: " + result); + return result != null && result; + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + System.err.println("Failed to remove data for key: " + key + ", error: " + e.getMessage()); + return false; + } + } + + /** + * Get data from the distributed cache + * @param key Cache key + * @return Cached value or null if not found + */ + @SuppressWarnings("unchecked") + public T getData(String key) { + try { + MemcachedClient client = hashingManager.getNode(key); + if (client == null) { + System.err.println("No available Memcached nodes for key: " + key); + return null; + } + + Object result = client.get(key); + + System.out.println("Retrieved data for key: " + key + ", found: " + (result != null)); + return (T) result; + + } catch (Exception e) { + System.err.println("Failed to get data for key: " + key + ", error: " + e.getMessage()); + return null; + } + } + + /** + * Check if a key exists in the cache + * @param key Cache key + * @return true if key exists, false otherwise + */ + public boolean exists(String key) { + return getData(key) != null; + } + + /** + * Get cache statistics + * @return String representation of cache status + */ + public String getCacheStatus() { + StringBuilder status = new StringBuilder(); + status.append("Cache Status:\n"); + status.append("Active Memcached Nodes: ").append(hashingManager.getMemcachedClients().size()).append("\n"); + status.append("Hash Ring Entries: ").append(hashingManager.getHashRing().size()).append("\n"); + + for (String nodeId : hashingManager.getMemcachedClients().keySet()) { + status.append("Node: ").append(nodeId).append("\n"); + } + + return status.toString(); + } +} \ No newline at end of file diff --git a/src/main/java/com/DSC/distributedCache/config/ZookeeperConfig.java b/src/main/java/com/DSC/distributedCache/config/ZookeeperConfig.java new file mode 100644 index 0000000..34c08af --- /dev/null +++ b/src/main/java/com/DSC/distributedCache/config/ZookeeperConfig.java @@ -0,0 +1,21 @@ +package com.DSC.distributedCache.config; + +import com.DSC.distributedCache.zookeeper.ZookeeperConnection; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ZookeeperConfig { + + @Value("${zookeeper.address:zookeeper:2181}") + private String ZK_ADDRESS; + + @Value("${memcached.path:/memcached}") + private String MEMCACHED_PATH; + + @Bean + public ZookeeperConnection createZookeeperConnection() { + return new ZookeeperConnection(ZK_ADDRESS, MEMCACHED_PATH); + } +} \ No newline at end of file diff --git a/src/main/java/com/DSC/distributedCache/controller/CacheController.java b/src/main/java/com/DSC/distributedCache/controller/CacheController.java new file mode 100644 index 0000000..e142874 --- /dev/null +++ b/src/main/java/com/DSC/distributedCache/controller/CacheController.java @@ -0,0 +1,110 @@ +package com.DSC.distributedCache.controller; + +import com.DSC.distributedCache.cache.CacheManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.HashMap; +import java.util.Map; + +@RestController +@RequestMapping("/api/cache") +public class CacheController { + + @Autowired + private CacheManager cacheManager; + + /** + * Add data to cache + * POST /api/cache/{key} + */ + @PostMapping("/{key}") + public ResponseEntity> addData( + @PathVariable String key, + @RequestBody Object value, + @RequestParam(defaultValue = "3600") int expire) { + + Map response = new HashMap<>(); + + boolean success = cacheManager.addData(key, value, expire); + + response.put("success", success); + response.put("message", success ? "Data added successfully" : "Failed to add data"); + response.put("key", key); + + return ResponseEntity.ok(response); + } + + /** + * Get data from cache + * GET /api/cache/{key} + */ + @GetMapping("/{key}") + public ResponseEntity> getData(@PathVariable String key) { + Map response = new HashMap<>(); + + Object data = cacheManager.getData(key); + + response.put("key", key); + response.put("found", data != null); + response.put("data", data); + + return ResponseEntity.ok(response); + } + + /** + * Remove data from cache + * DELETE /api/cache/{key} + */ + @DeleteMapping("/{key}") + public ResponseEntity> removeData(@PathVariable String key) { + Map response = new HashMap<>(); + + boolean success = cacheManager.removeData(key); + + response.put("success", success); + response.put("message", success ? "Data removed successfully" : "Failed to remove data"); + response.put("key", key); + + return ResponseEntity.ok(response); + } + + /** + * Check if key exists in cache + * HEAD /api/cache/{key} + */ + @RequestMapping(value = "/{key}", method = RequestMethod.HEAD) + public ResponseEntity existsData(@PathVariable String key) { + boolean exists = cacheManager.exists(key); + return exists ? ResponseEntity.ok().build() : ResponseEntity.notFound().build(); + } + + /** + * Get cache status + * GET /api/cache/status + */ + @GetMapping("/status") + public ResponseEntity> getCacheStatus() { + Map response = new HashMap<>(); + + String status = cacheManager.getCacheStatus(); + + response.put("status", status); + response.put("timestamp", System.currentTimeMillis()); + + return ResponseEntity.ok(response); + } + + /** + * Health check endpoint + * GET /api/cache/health + */ + @GetMapping("/health") + public ResponseEntity> healthCheck() { + Map response = new HashMap<>(); + response.put("status", "UP"); + response.put("service", "Distributed Cache"); + return ResponseEntity.ok(response); + } +} \ No newline at end of file diff --git a/src/main/java/com/DSC/distributedCache/hashing/HashingManager.java b/src/main/java/com/DSC/distributedCache/hashing/HashingManager.java new file mode 100644 index 0000000..60f4872 --- /dev/null +++ b/src/main/java/com/DSC/distributedCache/hashing/HashingManager.java @@ -0,0 +1,101 @@ +package com.DSC.distributedCache.hashing; + +import lombok.Getter; +import net.spy.memcached.MemcachedClient; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class HashingManager { + + private int numberOfReplicas = 160; + @Getter + private NavigableMap hashRing = new TreeMap<>(); + @Getter + private Map memcachedClients = new ConcurrentHashMap<>(); + + public void addNode(String node) { + try { + // Parse node address (e.g., "127.0.0.1:11211") + System.out.println("Adding node: " + node); + String[] parts = node.split(":"); + if(parts.length != 2) { + // 예외 처리: 잘못된 노드 데이터! + throw new IllegalArgumentException("node 데이터 형식이 잘못됨: " + node); + } + String host = parts[0]; + int port = Integer.parseInt(parts[1]); + + // Create MemcachedClient for this node + MemcachedClient client = new MemcachedClient(new InetSocketAddress(host, port)); + memcachedClients.put(node, client); + + // Add virtual nodes to hash ring + for (int i = 0; i < numberOfReplicas; i++) { + int hash = hashing(node + ":" + i); + hashRing.put(hash, node); + } + + System.out.println("Added node: " + node + " to hash ring"); + } catch (Exception e) { + System.err.println("Failed to create MemcachedClient for node: " + node); + e.printStackTrace(); + } + } + + public void removeNode(String node) { + // Remove virtual nodes from hash ring + for (int i = 0; i < numberOfReplicas; i++) { + int hash = hashing(node + ":" + i); + hashRing.remove(hash); + } + + // Close and remove MemcachedClient + MemcachedClient client = memcachedClients.remove(node); + if (client != null) { + client.shutdown(); + } + + System.out.println("Removed node: " + node + " from hash ring"); + } + + public MemcachedClient getNode(String key) { + if (hashRing.isEmpty()) { + return null; + } + + int hash = hashing(key); + + // Find the first node in the ring that is >= hash + NavigableMap tailMap = hashRing.tailMap(hash, true); + String nodeId = tailMap.isEmpty() ? hashRing.firstEntry().getValue() : tailMap.firstEntry().getValue(); + + return memcachedClients.get(nodeId); + } + + public int hashing(String key) { + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] digest = md.digest(key.getBytes()); + + // Convert first 4 bytes to int + int hash = 0; + for (int i = 0; i < 4; i++) { + hash = (hash << 8) | (digest[i] & 0xFF); + } + + return Math.abs(hash); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("MD5 algorithm not available", e); + } + } + +} \ No newline at end of file diff --git a/src/main/java/com/DSC/distributedCache/zookeeper/ZKEventHandler.java b/src/main/java/com/DSC/distributedCache/zookeeper/ZKEventHandler.java new file mode 100644 index 0000000..7ae7eaa --- /dev/null +++ b/src/main/java/com/DSC/distributedCache/zookeeper/ZKEventHandler.java @@ -0,0 +1,95 @@ +package com.DSC.distributedCache.zookeeper; + +import com.DSC.distributedCache.hashing.HashingManager; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class ZKEventHandler { + + @Autowired + private HashingManager hashingManager; + + public void childAddedEventHandler(ChildData newData) { + if (newData != null) { + String nodePath = newData.getPath(); + String nodeData = new String(newData.getData()); + + System.out.println("Child added: " + nodePath + " with data: " + nodeData); + + // Extract node address from path or data and add to hash ring + String nodeAddress = extractNodeAddress(nodePath, nodeData); + if (nodeAddress != null) { + hashingManager.addNode(nodeAddress); + } + } + } + + public void childRemovedEventHandler(ChildData oldData) { + if (oldData != null) { + String nodePath = oldData.getPath(); + String nodeData = new String(oldData.getData()); + + System.out.println("Child removed: " + nodePath + " with data: " + nodeData); + + // Extract node address from path or data and remove from hash ring + String nodeAddress = extractNodeAddress(nodePath, nodeData); + if (nodeAddress != null) { + hashingManager.removeNode(nodeAddress); + } + } + } + + public void childUpdatedEventHandler(ChildData oldData, ChildData newData) { + if (newData != null) { + String nodePath = newData.getPath(); + String nodeData = new String(newData.getData()); + + System.out.println("Child updated: " + nodePath + " with data: " + nodeData); + + // Handle node updates if necessary + // For now, we'll just log the event + } + } + + public void connectionLostEventHandler() { + System.out.println("Connection lost to Zookeeper"); + // Handle connection loss - possibly pause operations + } + + public void connectionReconnectedEventHandler() { + System.out.println("Connection reconnected to Zookeeper"); + // Handle reconnection - resume operations + } + + public void connectionSuspendedEventHandler() { + System.out.println("Connection suspended to Zookeeper"); + // Handle suspended connection + } + + public void initializedEventHandler() { + System.out.println("Zookeeper cache initialized"); + // Cache has been initialized and populated + } + + private String extractNodeAddress(String nodePath, String nodeData) { + // Extract node address from Zookeeper path or data + // Assuming the data contains the address in format "host:port" + if (nodeData != null && !nodeData.trim().isEmpty()) { + return nodeData.trim(); + } + + // If data is empty, try to extract from path + // Assuming path format like "/memcached/node-127.0.0.1:11211" + String[] pathParts = nodePath.split("/"); + if (pathParts.length > 0) { + String lastPart = pathParts[pathParts.length - 1]; + if (lastPart.contains("-")) { + return lastPart.substring(lastPart.indexOf("-") + 1); + } + } + + return null; + } +} \ No newline at end of file diff --git a/src/main/java/com/DSC/distributedCache/zookeeper/ZookeeperConnection.java b/src/main/java/com/DSC/distributedCache/zookeeper/ZookeeperConnection.java new file mode 100644 index 0000000..d1e9092 --- /dev/null +++ b/src/main/java/com/DSC/distributedCache/zookeeper/ZookeeperConnection.java @@ -0,0 +1,116 @@ +package com.DSC.distributedCache.zookeeper; + +import lombok.Getter; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; + +public class ZookeeperConnection { + + // Getters + @Getter + private CuratorFramework zookeeperObj; + @Getter + private CuratorCache cacheCluster; + + @Autowired + private ApplicationContext applicationContext; + + @Getter + private ZKEventHandler zkEventHandler; + + private final String zkAddress; + private final String memcachedPath; + + public ZookeeperConnection(String zkAddress, String memcachedPath) { + this.zkAddress = zkAddress; + this.memcachedPath = memcachedPath; + } + + @PostConstruct + public void initialize() { + + System.out.println("ZK Address: " + zkAddress); + + try { + // Get ZKEventHandler from Spring context + zkEventHandler = applicationContext.getBean(ZKEventHandler.class); + + // Create Zookeeper client with retry policy + zookeeperObj = CuratorFrameworkFactory.newClient( + zkAddress, + new ExponentialBackoffRetry(1000, 3) + ); + + // Start the client + zookeeperObj.start(); + zookeeperObj.blockUntilConnected(); + + System.out.println("Connected to Zookeeper at: " + zkAddress); + + // Ensure the memcached path exists + if (zookeeperObj.checkExists().forPath(memcachedPath) == null) { + zookeeperObj.create().creatingParentsIfNeeded().forPath(memcachedPath); + System.out.println("Created path: " + memcachedPath); + } + + // Register watcher + registerWatcher(); + + } catch (Exception e) { + System.err.println("Failed to initialize Zookeeper connection: " + e.getMessage()); + e.printStackTrace(); + } + } + + public void registerWatcher() { + try { + // Create CuratorCache to monitor children of memcached path + cacheCluster = CuratorCache.build(zookeeperObj, memcachedPath); + + // Add listener using CuratorCacheListener + CuratorCacheListener listener = CuratorCacheListener.builder() + .forCreates(zkEventHandler::childAddedEventHandler) + .forDeletes(zkEventHandler::childRemovedEventHandler) + .forChanges(zkEventHandler::childUpdatedEventHandler) + .build(); + + cacheCluster.listenable().addListener(listener); + + // Start the cache + cacheCluster.start(); + + System.out.println("Registered watcher for path: " + memcachedPath); + + } catch (Exception e) { + System.err.println("Failed to register watcher: " + e.getMessage()); + e.printStackTrace(); + } + } + + @PreDestroy + public void cleanup() { + try { + if (cacheCluster != null) { + cacheCluster.close(); + } + + if (zookeeperObj != null) { + zookeeperObj.close(); + } + + System.out.println("Zookeeper connection closed"); + + } catch (Exception e) { + System.err.println("Error during cleanup: " + e.getMessage()); + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index 4bb83f0..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -spring.application.name=distributedCache diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..992997f --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,32 @@ +# Zookeeper Configuration +zookeeper: + address: zookeeper:2181 + +memcached: + path: /memcached + +# Server Configuration +server: + port: 8080 + +# Logging Configuration +logging: + level: + com.DSC.distributedCache: INFO + org.apache.curator: WARN + net.spy.memcached: WARN + +# Application Configuration +spring: + application: + name: distributed-cache-server + +# Health Check Configuration +management: + endpoints: + web: + exposure: + include: health,info + endpoint: + health: + show-details: always \ No newline at end of file