JSON-OBJECT Software Engineering Blog

Professional Senior Backend Engineer. Specializing in high volume traffic and distributed processing with Kotlin and Spring Boot as core technologies.

View on GitHub
4 April 2023

Spring Boot, Hazelcast를 이용하여 분산 캐시 구현하기

by Taehyeong Lee

개요

라이브러리 종속성 추가

dependencies {
    implementation("com.hazelcast:hazelcast:5.2.3")
    
    // 분산 캐시에 오브젝트 저장시 직렬화 방식으로 Smile을 사용하기 위해 추가
    implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.14.3")
}

@Configuration 클래스 작성

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.MapperFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.dataformat.smile.databind.SmileMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.hazelcast.config.Config
import com.hazelcast.core.Hazelcast
import com.hazelcast.core.HazelcastInstance
import com.jocoos.flipflop.api.common.util.CommonUtil
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class HazelcastConfig {

    @Bean
    fun hazelcastInstance(): HazelcastInstance {

        return Hazelcast.newHazelcastInstance(
            Config().apply {
                this.clusterName = "foo-dev"
            }
        )
    }
    
    // 분산 캐시에 오브젝트 저장시 직렬화 방식으로 Smile을 사용하기 위해 추가
    @Bean("smileObjectMapper")
    fun smileObjectMapper(): ObjectMapper {

        return SmileMapper().registerModules(JavaTimeModule())
            .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true)
            .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            .setSerializationInclusion(JsonInclude.Include.ALWAYS)
            .disable(MapperFeature.USE_ANNOTATIONS)
    }
}

@Configuration 클래스 작성: Huawei Cloud ECS 환경

@Configuration 클래스 작성: Amazon ECS 환경

ecs:ListTasks
ecs:DescribeTasks
ec2:DescribeNetworkInterfaces
    @Bean
    fun hazelcastInstance(): HazelcastInstance {

        return Hazelcast.newHazelcastInstance(
            Config().apply {
                this.clusterName = "foo-dev"
                this.networkConfig.join.multicastConfig.isEnabled = false
                this.networkConfig.join.awsConfig.apply {
                    isEnabled = true
                    isUsePublicIp = false
                    setProperty("connection-retries", "2")
                    setProperty("connection-timeout-seconds", "3")                    

                    // 실제 ECS 인스턴스의 서비스명을 기입
                    setProperty("service-name", "foo-dev")
                }
                this.networkConfig.interfaces
                    .setEnabled(true)

                    // 실제 ECS 인스턴스의 VPC CIDR 블록을 기입
                    .addInterface("10.0.*.*")
            }
        )
    }
# TCP/5701-5801 포트 범위를 인바운드 개방 설정
$ aws ec2 authorize-security-group-ingress \
    --group-id {security-group-id} \
    --ip-permissions '[{"IpProtocol": "tcp", "FromPort": 5701, "ToPort": 5801, "Ipv6Ranges": [{"CidrIpv6": "::/0"}], "IpRanges": [{"CidrIp": "0.0.0.0/0"}]}]'

분산 캐시 상태 헬스 체크 API 작성

import com.hazelcast.core.HazelcastInstance
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import java.time.Instant

@RestController
class SystemCacheController(
    private val hazelcastInstance: HazelcastInstance
) {
    @GetMapping("cache-state")
    fun getCacheState(): ResponseEntity<Any> {

        return ResponseEntity.ok(
            mapOf(
                "clusterState" to hazelcastInstance.cluster.clusterState,
                "memberSize" to hazelcastInstance.cluster.members.size,
                "members" to hazelcastInstance.cluster.members.map {
                    mapOf(
                        "host" to it.address.host,
                        "port" to it.address.port,
                        "isIpv4" to it.address.isIPv4,
                        "isIpv6" to it.address.isIPv6,
                        "isLocalMember" to it.localMember(),
                        "isLiteMember" to it.isLiteMember
                    )
                }
            )
        )
    }
}

FooDTO 데이터 클래스 작성

import java.io.Serializable
import java.time.Instant

data class FooDTO(

    var id: Long = 1,
    var name: String? = null,
    var createdAt: Instant = Instant.now()

) : Serializable

FooController 컨트롤러 클래스 작성

import com.hazelcast.core.HazelcastInstance
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import java.util.concurrent.TimeUnit

@RestController
class FooController(
    private val hazelcastInstance: HazelcastInstance,
    private val smileObjectMapper: ObjectMapper
) {
    @GetMapping("/foos/{id}")
    fun getFoo(@PathVariable id: Long): ResponseEntity<Any> {

        val fooMap = hazelcastInstance.getMap<Long, ByteArray>("foo-map")

        fooMap[id]?.let { ResponseEntity.ok(smileObjectMapper.readValue(it, FooDTO::class.java)) } ?: return ResponseEntity.notFound().build()
    }

    @PostMapping("/foos")
    fun setFoo(@RequestBody foo: FooDTO): ResponseEntity<Any> {

        val fooMap = hazelcastInstance.getMap<Long, ByteArray>("foo-map")

        fooMap.set(foo.id, smileObjectMapper.writeValueAsBytes(foo), 2, TimeUnit.HOURS)

        return ResponseEntity.noContent().build()
    }
}

애플리케이션 실행

# 1번 인스턴스 실행
$ SERVER_PORT=8080 ./gradlew bootRun

# 2번 인스턴스 실행
$ SERVER_PORT=8081 ./gradlew bootRun

분산 캐시 작동 테스트

# 1번 인스턴스에서 캐시를 저장
$ curl -i -X POST -H "Content-Type:application/json" -d \
'{
  "id": 1,
  "name": "One"
}
' 'http://localhost:8080/foos'

# 2번 인스턴스에서 캐시를 조회
$ curl -i -X GET 'http://localhost:8081/foos/1'
{
    "id": 1,
    "name": "One",
    "createdAt": "2023-04-04T14:34:38.091031743Z"
}

Reliable Topic: 분산 PUB-SUB 메시징 모델

import com.hazelcast.core.HazelcastInstance
import com.hazelcast.topic.Message
import com.hazelcast.topic.MessageListener
import org.springframework.context.annotation.Lazy
import org.springframework.stereotype.Service

@Service
class FooService(
    @Lazy private val hazelcastInstance: HazelcastInstance
) : MessageListener<FooDTO> {

    // [1] 메시지를 동기로 Publish한다.
    fun publishMessage(message: FooDTO) {

        // 멤버가 클러스터 이탈할 상태일 경우 com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active! 예외 발생
        hazelcastInstance
            .getReliableTopic<FooDTO>("foo-topic")
            .publish(message)
    }

    // [1] 메시지를 비동기로 Publish한다.
    fun publishMessageAsync(message: FooDTO) {

        // 멤버가 클러스터 이탈할 상태일 경우 com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active! 예외 발생
        hazelcastInstance
            .getReliableTopic<FooDTO>("foo-topic")
            .publishAsync(message)
    }

    // [2] Publish된 메시지를 수신한다.
    // Hazelcast에 의해 관리되는 hz.로 시작하는 쓰레드로 실행됨에 유의한다.
    override fun onMessage(message: Message<FooDTO>) {

        println(message.publishingMember)
        println(message.publishTime)
        println(message.messageObject)
    }
}
@Configuration
class HazelcastConfig(
    private val fooService: FooService
) {
    @Bean
    fun hazelcastInstance(): HazelcastInstance {

        return Hazelcast.newHazelcastInstance(
            Config().apply {
                this.clusterName = "foo-dev"
            }  
        )
        // 클러스터 멤버 등록 즉시 Topic을 Subscribe한다.
        .also { cache ->
            cache
                .getReliableTopic<FooDTO>("foo-topic")
                .addMessageListener(fooService)
        }
    }
}

Graceful Shutdown

# [1] 환경 변수로 Spring Boot의 Graceful Shutdown 옵션 활성화
SERVER_SHUTDOWN=graceful
SPRING_LIFECYCLE_TIMEOUT_PER_SHUTDOWN_PHASE=30s

# [2] JVM 파라메터로 Hazelcast의 Graceful Shutdown 옵션 비활성화
# 반드시 SLF4J 로깅 타입 옵션을 활성화해야 Graceful Shutdown 로그가 정상 적재
-Dhazelcast.shutdownhook.enabled=false -Dhazelcast.logging.type=slf4j -Dlogging.level.com.hazelcast=INFO
// 클러스터 멤버의 Graceful Shutdown을 명시적으로 실행
hazelcastInstance.shutdown()
[hz.zealous_keldysh.async.thread-2] INFO  c.hazelcast.core.LifecycleService - [192.168.0.2]:5701 [foo-dev] [5.2.3] [192.168.0.2]:5701 is SHUTTING_DOWN
[hz.charming_montalcini.migration] INFO  c.h.i.p.impl.MigrationManager - [192.168.0.2]:5701 [foo-dev] [5.2.3] Repartitioning cluster data. Migration tasks count: 204
[hz.charming_montalcini.migration] INFO  c.h.i.p.impl.MigrationManager - [192.168.0.2]:5701 [foo-dev] [5.2.3] All migration tasks have been completed. (repartitionTime=Fri May 19 01:56:06 UTC 2023, plannedMigrations=204, completedMigrations=204, remainingMigrations=0, totalCompletedMigrations=1425)
[hz.zealous_keldysh.async.thread-2] INFO  com.hazelcast.instance.impl.Node - [192.168.0.2]:5701 [foo-dev] [5.2.3] Shutting down multicast service...
[hz.zealous_keldysh.async.thread-2] INFO  com.hazelcast.instance.impl.Node - [192.168.0.2]:5701 [foo-dev] [5.2.3] Shutting down connection manager...
[hz.zealous_keldysh.async.thread-2] INFO  c.h.instance.impl.NodeExtension - [192.168.0.2]:5701 [foo-dev] [5.2.3] Destroying node NodeExtension.
[hz.zealous_keldysh.async.thread-2] INFO  com.hazelcast.instance.impl.Node - [192.168.0.2]:5701 [foo-dev] [5.2.3] Hazelcast Shutdown is completed in 99 ms.
[hz.zealous_keldysh.async.thread-2] INFO  c.hazelcast.core.LifecycleService - [192.168.0.2]:5701 [foo-dev] [5.2.3] [192.168.0.2]:5701 is SHUTDOWN

Rolling Member Upgrade는 유료 버전에서만 지원

[hz.nervous_hodgkin.generic-operation.thread-1] ERROR com.hazelcast.security - [192.168.0.2]:5701 [foo-dev] [5.3.0] Node could not join cluster. Before join check failed node is going to shutdown now!
[hz.nervous_hodgkin.generic-operation.thread-1] ERROR com.hazelcast.security - [192.168.0.2]:5701 [foo-dev] [5.3.0] Reason of failure for node join: Joining node's version 5.3.0 is not compatible with cluster version 5.2 (Rolling Member Upgrades are only supported in Hazelcast Enterprise)

프로덕션 도입 소감

참고 글

tags: Server-Sent Event