spring boot · 2024-12-18 0

springboot使用redis session 浅析

一、示例

1.pom

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.2</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.session</groupId>
        <artifactId>spring-session-data-redis</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

2.HttpSessionListener

@Component
public class MyHttpSessionListener implements HttpSessionListener {

    @Override
    public void sessionCreated(HttpSessionEvent se) {
        System.out.println("sessionCreated");
    }

    @Override
    public void sessionDestroyed(HttpSessionEvent se) {
        System.out.println("sessionDestroyed");
    }
}

3.controller

@RestController
public class IndexController {

    private final static String SESSION_ATTRIBUTE = "UserId";

    @GetMapping("/")
    public String index() {
        return "welcome to index!";
    }

    @GetMapping("/create")
    public String create(HttpServletRequest request) {
        HttpSession session = request.getSession();
        session.setAttribute(SESSION_ATTRIBUTE, "1");
        int interval = session.getMaxInactiveInterval();
        System.out.println(interval);
        return "create";
    }

    @GetMapping("/destroy")
    public String destroy(HttpServletRequest request) {
        HttpSession session = request.getSession();
        session.invalidate();
        return "destroy";
    }
}

4.application.yml

server:
  port: 18080
  servlet:
    session:
      timeout: 3600

spring:
  redis:
    host: 127.0.0.1
    port: 6379

二、注册 SessionRepositoryFilter 和 SessionEventHttpSessionListenerAdapter

// RedisSessionConfiguration.java
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RedisTemplate.class, RedisIndexedSessionRepository.class })
@ConditionalOnMissingBean(SessionRepository.class)
@ConditionalOnBean(RedisConnectionFactory.class)
@Conditional(ServletSessionCondition.class)
@EnableConfigurationProperties(RedisSessionProperties.class)
class RedisSessionConfiguration {
    ...
    @Configuration(proxyBeanMethods = false)
    public static class SpringBootRedisHttpSessionConfiguration extends RedisHttpSessionConfiguration {

        @Autowired
        public void customize(SessionProperties sessionProperties, RedisSessionProperties redisSessionProperties,
                ServerProperties serverProperties) {
            Duration timeout = sessionProperties
                    .determineTimeout(() -> serverProperties.getServlet().getSession().getTimeout());
            if (timeout != null) {
                setMaxInactiveIntervalInSeconds((int) timeout.getSeconds());
            }
            setRedisNamespace(redisSessionProperties.getNamespace());
            setFlushMode(redisSessionProperties.getFlushMode());
            setSaveMode(redisSessionProperties.getSaveMode());
            setCleanupCron(redisSessionProperties.getCleanupCron());
        }

    }
}
// RedisHttpSessionConfiguration.java
@Configuration(proxyBeanMethods = false)
public class RedisHttpSessionConfiguration extends SpringHttpSessionConfiguration
        implements BeanClassLoaderAware, EmbeddedValueResolverAware, ImportAware {

    private Integer maxInactiveIntervalInSeconds = MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;

    @Bean
    public RedisIndexedSessionRepository sessionRepository() {
        RedisTemplate<Object, Object> redisTemplate = createRedisTemplate();
        RedisIndexedSessionRepository sessionRepository = new RedisIndexedSessionRepository(redisTemplate);
        sessionRepository.setApplicationEventPublisher(this.applicationEventPublisher);
        if (this.indexResolver != null) {
            sessionRepository.setIndexResolver(this.indexResolver);
        }
        if (this.defaultRedisSerializer != null) {
            sessionRepository.setDefaultSerializer(this.defaultRedisSerializer);
        }
        sessionRepository.setDefaultMaxInactiveInterval(this.maxInactiveIntervalInSeconds);
        if (StringUtils.hasText(this.redisNamespace)) {
            sessionRepository.setRedisKeyNamespace(this.redisNamespace);
        }
        sessionRepository.setFlushMode(this.flushMode);
        sessionRepository.setSaveMode(this.saveMode);
        int database = resolveDatabase();
        sessionRepository.setDatabase(database);
        this.sessionRepositoryCustomizers
                .forEach((sessionRepositoryCustomizer) -> sessionRepositoryCustomizer.customize(sessionRepository));
        return sessionRepository;
    }

    public void setMaxInactiveIntervalInSeconds(int maxInactiveIntervalInSeconds) {
        this.maxInactiveIntervalInSeconds = maxInactiveIntervalInSeconds;
    }
}
// SpringHttpSessionConfiguration.java
@Configuration(proxyBeanMethods = false)
public class SpringHttpSessionConfiguration implements ApplicationContextAware {

    @Bean
    public <S extends Session> SessionRepositoryFilter<? extends Session> springSessionRepositoryFilter(
            SessionRepository<S> sessionRepository) {
        SessionRepositoryFilter<S> sessionRepositoryFilter = new SessionRepositoryFilter<>(sessionRepository);
        sessionRepositoryFilter.setHttpSessionIdResolver(this.httpSessionIdResolver);
        return sessionRepositoryFilter;
    }

    private List<HttpSessionListener> httpSessionListeners = new ArrayList<>();

    @Autowired(required = false)
    public void setHttpSessionListeners(List<HttpSessionListener> listeners) {
        this.httpSessionListeners = listeners;
    }

    @Bean
    public SessionEventHttpSessionListenerAdapter sessionEventHttpSessionListenerAdapter() {
        return new SessionEventHttpSessionListenerAdapter(this.httpSessionListeners);
    }
}

三、包装 request, respose

1) 把 request 包装成 SessionRepositoryRequestWrapper
2) 把 response 包装成 SessionRepositoryResponseWrapper

// SessionRepositoryFilter.java
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
        throws ServletException, IOException {
    request.setAttribute(SESSION_REPOSITORY_ATTR, this.sessionRepository);

    SessionRepositoryRequestWrapper wrappedRequest = new SessionRepositoryRequestWrapper(request, response);
    SessionRepositoryResponseWrapper wrappedResponse = new SessionRepositoryResponseWrapper(wrappedRequest,
            response);

    try {
        filterChain.doFilter(wrappedRequest, wrappedResponse);
    }
    finally {
        wrappedRequest.commitSession();
    }
}

四、getSession()

1) getCurrentSession() 从 request 的 "org.springframework.session.SessionRepository.CURRENT_SESSION" 属性,获得 Session
2) getRequestedSession() 从 redis 中获得 session,requestedSession 类型是 RedisIndexedSessionRepository$RedisSession
getRequestedSession() 读取 redis 的 key 是 spring:session:sessions:e941ea64-3218-4304-9dec-ba346372d308 的值,取出 redis 值 lastAccessedTime 与 maxInactiveInterval 对比,判断 session 是否过期,如果过期,返回 null
3) requestedSession.setLastAccessedTime(Instant.now()),设置 RedisSession 的 Map<String, Object> delta 放入 lastAccessedTime 值
设置 RedisSession 的 MapSession 的 setLastAccessedTime
4) setCurrentSession(currentSession),设置 request 的 "org.springframework.session.SessionRepository.CURRENT_SESSION" 值

// SessionRepositoryFilter$SessionRepositoryRequestWrapper.java 
@Override
public HttpSessionWrapper getSession() {
    return getSession(true);
}

@Override
public HttpSessionWrapper getSession(boolean create) {
    HttpSessionWrapper currentSession = getCurrentSession();
    if (currentSession != null) {
        return currentSession;
    }
    S requestedSession = getRequestedSession();
    if (requestedSession != null) {
        if (getAttribute(INVALID_SESSION_ID_ATTR) == null) {
            requestedSession.setLastAccessedTime(Instant.now());
            this.requestedSessionIdValid = true;
            currentSession = new HttpSessionWrapper(requestedSession, getServletContext());
            currentSession.markNotNew();
            setCurrentSession(currentSession);
            return currentSession;
        }
    }
    else {
        ...
        setAttribute(INVALID_SESSION_ID_ATTR, "true");
    }
    if (!create) {
        return null;
    }
    ...
    S session = SessionRepositoryFilter.this.sessionRepository.createSession();
    session.setLastAccessedTime(Instant.now());
    currentSession = new HttpSessionWrapper(session, getServletContext());
    setCurrentSession(currentSession);
    return currentSession;
}

private HttpSessionWrapper getCurrentSession() {
    return (HttpSessionWrapper) getAttribute(CURRENT_SESSION_ATTR);
}

private S getRequestedSession() {
    if (!this.requestedSessionCached) {
        List<String> sessionIds = SessionRepositoryFilter.this.httpSessionIdResolver.resolveSessionIds(this);
        for (String sessionId : sessionIds) {
            if (this.requestedSessionId == null) {
                this.requestedSessionId = sessionId;
            }
            S session = SessionRepositoryFilter.this.sessionRepository.findById(sessionId);
            if (session != null) {
                this.requestedSession = session;
                this.requestedSessionId = sessionId;
                break;
            }
        }
        this.requestedSessionCached = true;
    }
    return this.requestedSession;
}

五、session.setAttribute()

1) this.session.getAttribute(name),this.session 是 RedisIndexedSessionRepository$RedisSession
从 this.session 的 MapSession cached 取得指定 name 的值
2) 把 value 放入 this.cached;把 value 放入 this.delta,key 为 "sessionAttr:UserId",此时 this.delta 有两个 key,"lastAccessedTime""sessionAttr:UserId"

// SessionRepositoryFilter$SessionRepositoryRequestWrapper$HttpSessionWrapper.java
@Override
public void setAttribute(String name, Object value) {
    checkState();
    Object oldValue = this.session.getAttribute(name);
    ...
}
// RedisIndexedSessionRepository$RedisSession.java
@Override
public <T> T getAttribute(String attributeName) {
    T attributeValue = this.cached.getAttribute(attributeName);
    ...
    return attributeValue;
}

@Override
public void setAttribute(String attributeName, Object attributeValue) {
    this.cached.setAttribute(attributeName, attributeValue);
    this.delta.put(getSessionAttrNameKey(attributeName), attributeValue);
    flushImmediateIfNecessary();
}

六、session.getAttribute

从 this.session 的 MapSession cached 取得指定 name 的值

// SessionRepositoryFilter$SessionRepositoryRequestWrapper$HttpSessionWrapper.java
@Override
public Object getAttribute(String name) {
    checkState();
    return this.session.getAttribute(name);
}

七、commitSession()

当设置有效时间为 3600s

1) SessionRepositoryFilter.this.sessionRepository.save(session);
2) 当 delta 不为空时,会把 session 保存到 redis,并修改 redis 的 key 过期时间,即续期
3) key 为 spring:session:sessions:e941ea64-3218-4304-9dec-ba346372d308,是 hash 类型,放入 this.delta 的值,有 "lastAccessedTime" -> 1734156690429,"sessionAttr:UserId" -> "1","maxInactiveInterval" -> 3600,"creationTime" -> 1734156690429,设置 key 有效时间为 3600s + 5min
4) key 为 spring:session:expirations:1734164640000(1734164640000 是过期时间),是 set 类型,值有 expires:e941ea64-3218-4304-9dec-ba346372d308,设置 key 有效时间为 3600s + 5min
可以把这种 key 看作是桶,每次保存 session 的 时候,如果上次访问与这次访问不在同一分钟内,会把值从旧桶移除,添加到新桶中
5) key 为 spring:session:sessions:expires:e941ea64-3218-4304-9dec-ba346372d308,是 string 类型,值为 "",设置 key 有效时间为 3600s
6) this.sessionRedisOperations.convertAndSend 发布消息 channel 是 spring:session:event:0:created:e941ea64-3218-4304-9dec-ba346372d308,message 是 session.delta
7) this.requestedSessionCached = false
8) SessionRepositoryFilter.this.httpSessionIdResolver.setSessionId(this, this.response, sessionId),执行写响应头 response.addHeader("Set-Cookie", "SESSION=e941ea64-3218-4304-9dec-ba346372d308; Path=/; HttpOnly; SameSite=Lax");

// SessionRepositoryFilter.java
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
        throws ServletException, IOException {
    request.setAttribute(SESSION_REPOSITORY_ATTR, this.sessionRepository);

    SessionRepositoryRequestWrapper wrappedRequest = new SessionRepositoryRequestWrapper(request, response);
    SessionRepositoryResponseWrapper wrappedResponse = new SessionRepositoryResponseWrapper(wrappedRequest,
            response);

    try {
        filterChain.doFilter(wrappedRequest, wrappedResponse);
    }
    finally {
        wrappedRequest.commitSession();
    }
}
// SessionRepositoryFilter$SessionRepositoryRequestWrapper
private void commitSession() {
    HttpSessionWrapper wrappedSession = getCurrentSession();
    if (wrappedSession == null) {
        if (isInvalidateClientSession()) {
            SessionRepositoryFilter.this.httpSessionIdResolver.expireSession(this, this.response);
        }
    }
    else {
        S session = wrappedSession.getSession();
        clearRequestedSessionCache();
        SessionRepositoryFilter.this.sessionRepository.save(session);
        String sessionId = session.getId();
        if (!isRequestedSessionIdValid() || !sessionId.equals(getRequestedSessionId())) {
            SessionRepositoryFilter.this.httpSessionIdResolver.setSessionId(this, this.response, sessionId);
        }
    }
}

private void clearRequestedSessionCache() {
    this.requestedSessionCached = false;
    this.requestedSession = null;
    this.requestedSessionId = null;
}
// RedisIndexedSessionRepository.java
@Override
public void save(RedisSession session) {
    session.save();
    if (session.isNew) {
        String sessionCreatedKey = getSessionCreatedChannel(session.getId());
        this.sessionRedisOperations.convertAndSend(sessionCreatedKey, session.delta);
        session.isNew = false;
    }
}

final class RedisSession implements Session { 
    private void save() {
        saveChangeSessionId();
        saveDelta();
    }

    private void saveDelta() {
        if (this.delta.isEmpty()) {
            return;
        }
        String sessionId = getId();
        getSessionBoundHashOperations(sessionId).putAll(this.delta);
        String principalSessionKey = getSessionAttrNameKey(
                FindByIndexNameSessionRepository.PRINCIPAL_NAME_INDEX_NAME);
        String securityPrincipalSessionKey = getSessionAttrNameKey(SPRING_SECURITY_CONTEXT);
        if (this.delta.containsKey(principalSessionKey) || this.delta.containsKey(securityPrincipalSessionKey)) {
            if (this.originalPrincipalName != null) {
                String originalPrincipalRedisKey = getPrincipalKey(this.originalPrincipalName);
                RedisIndexedSessionRepository.this.sessionRedisOperations.boundSetOps(originalPrincipalRedisKey)
                        .remove(sessionId);
            }
            Map<String, String> indexes = RedisIndexedSessionRepository.this.indexResolver.resolveIndexesFor(this);
            String principal = indexes.get(PRINCIPAL_NAME_INDEX_NAME);
            this.originalPrincipalName = principal;
            if (principal != null) {
                String principalRedisKey = getPrincipalKey(principal);
                RedisIndexedSessionRepository.this.sessionRedisOperations.boundSetOps(principalRedisKey)
                        .add(sessionId);
            }
        }

        this.delta = new HashMap<>(this.delta.size());

        Long originalExpiration = (this.originalLastAccessTime != null)
                ? this.originalLastAccessTime.plus(getMaxInactiveInterval()).toEpochMilli() : null;
        RedisIndexedSessionRepository.this.expirationPolicy.onExpirationUpdated(originalExpiration, this);
    }
}
// RedisSessionExpirationPolicy.java
void onExpirationUpdated(Long originalExpirationTimeInMilli, Session session) {
    String keyToExpire = SESSION_EXPIRES_PREFIX + session.getId();
    long toExpire = roundUpToNextMinute(expiresInMillis(session));

    if (originalExpirationTimeInMilli != null) {
        long originalRoundedUp = roundUpToNextMinute(originalExpirationTimeInMilli);
        if (toExpire != originalRoundedUp) {
            String expireKey = getExpirationKey(originalRoundedUp);
            this.redis.boundSetOps(expireKey).remove(keyToExpire);
        }
    }

    long sessionExpireInSeconds = session.getMaxInactiveInterval().getSeconds();
    String sessionKey = getSessionKey(keyToExpire);

    if (sessionExpireInSeconds < 0) {
        this.redis.boundValueOps(sessionKey).append("");
        this.redis.boundValueOps(sessionKey).persist();
        this.redis.boundHashOps(getSessionKey(session.getId())).persist();
        return;
    }

    String expireKey = getExpirationKey(toExpire);
    BoundSetOperations<Object, Object> expireOperations = this.redis.boundSetOps(expireKey);
    expireOperations.add(keyToExpire);

    long fiveMinutesAfterExpires = sessionExpireInSeconds + TimeUnit.MINUTES.toSeconds(5);

    expireOperations.expire(fiveMinutesAfterExpires, TimeUnit.SECONDS);
    if (sessionExpireInSeconds == 0) {
        this.redis.delete(sessionKey);
    }
    else {
        this.redis.boundValueOps(sessionKey).append("");
        this.redis.boundValueOps(sessionKey).expire(sessionExpireInSeconds, TimeUnit.SECONDS);
    }
    this.redis.boundHashOps(getSessionKey(session.getId())).expire(fiveMinutesAfterExpires, TimeUnit.SECONDS);
}
// CookieHttpSessionIdResolver.java
@Override
public void setSessionId(HttpServletRequest request, HttpServletResponse response, String sessionId) {
    if (sessionId.equals(request.getAttribute(WRITTEN_SESSION_ID_ATTR))) {
        return;
    }
    request.setAttribute(WRITTEN_SESSION_ID_ATTR, sessionId);
    this.cookieSerializer.writeCookieValue(new CookieValue(request, response, sessionId));
}

八、request.getSession().invalidate()

1) 移除 redis 中,key 为 spring:session:expirations:1734517620000 中 值 expires:ea063898-f4fe-4141-b38a-1496dcba28c9
2) 删除 key 为 spring:session:sessions:expires:ea063898-f4fe-4141-b38a-1496dcba28c9
3) 设置 maxInactiveInterval 为 0
4) 保存进 redis,如果 maxInactiveInterval 为 0,删除 key 是 spring:session:sessions:expires:ea063898-f4fe-4141-b38a-1496dcba28c9
3) requestedSessionCachedcommitSession,执行 SessionRepositoryFilter.this.httpSessionIdResolver.expireSession(this, this.response),执行写响应头 response.addHeader("Set-Cookie", "SESSION=; Max-Age=0; Expires=Thu, 1 Jan 1970 00:00:00 GMT; Path=/; HttpOnly; SameSite=Lax"),同个浏览器请求,则不会携带 Cookie: SESSION=ea063898-f4fe-4141-b38a-1496dcba28c9 的请求头进行请求。当然可以通过其他方式携带这样的请求头,例如通过 curl 指定请求头,当 java 执行 getSession() 时,会从 redis 获得 key 为 spring:session:sessions:ea063898-f4fe-4141-b38a-1496dcba28c9 的值,此时得到的 maxInactiveInterval 为 0,是无效的 session

// SessionRepositoryFilter$SessionRepositoryRequestWrapper$HttpSessionWrapper
@Override
public void invalidate() {
    super.invalidate();
    SessionRepositoryRequestWrapper.this.requestedSessionInvalidated = true;
    setCurrentSession(null);
    clearRequestedSessionCache();
    SessionRepositoryFilter.this.sessionRepository.deleteById(getId());
}

private void setCurrentSession(HttpSessionWrapper currentSession) {
    if (currentSession == null) {
        removeAttribute(CURRENT_SESSION_ATTR);
    }
    else {
        setAttribute(CURRENT_SESSION_ATTR, currentSession);
    }
}
// RedisIndexedSessionRepository.java
@Override
public void deleteById(String sessionId) {
    RedisSession session = getSession(sessionId, true);
    if (session == null) {
        return;
    }

    cleanupPrincipalIndex(session);
    this.expirationPolicy.onDelete(session);

    String expireKey = getExpiredKey(session.getId());
    this.sessionRedisOperations.delete(expireKey);

    session.setMaxInactiveInterval(Duration.ZERO);
    save(session);
}
// SessionRepositoryFilter$SessionRepositoryRequestWrapper
private void commitSession() {
    HttpSessionWrapper wrappedSession = getCurrentSession();
    if (wrappedSession == null) {
        if (isInvalidateClientSession()) {
            SessionRepositoryFilter.this.httpSessionIdResolver.expireSession(this, this.response);
        }
    }
    else {
        S session = wrappedSession.getSession();
        clearRequestedSessionCache();
        SessionRepositoryFilter.this.sessionRepository.save(session);
        String sessionId = session.getId();
        if (!isRequestedSessionIdValid() || !sessionId.equals(getRequestedSessionId())) {
            SessionRepositoryFilter.this.httpSessionIdResolver.setSessionId(this, this.response, sessionId);
        }
    }
}
// CookieHttpSessionIdResolver.java
@Override
public void expireSession(HttpServletRequest request, HttpServletResponse response) {
    this.cookieSerializer.writeCookieValue(new CookieValue(request, response, ""));
}

九、session 事件

RedisIndexedSessionRepository 订阅 redis

channel 为 __keyevent@0__:del、__keyevent@0__:expired
pattern 为 spring:session:event:0:created:*
// RedisHttpSessionConfiguration.java
@Bean
public RedisMessageListenerContainer springSessionRedisMessageListenerContainer(
        RedisIndexedSessionRepository sessionRepository) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(this.redisConnectionFactory);
    if (this.redisTaskExecutor != null) {
        container.setTaskExecutor(this.redisTaskExecutor);
    }
    if (this.redisSubscriptionExecutor != null) {
        container.setSubscriptionExecutor(this.redisSubscriptionExecutor);
    }
    container.addMessageListener(sessionRepository,
            Arrays.asList(new ChannelTopic(sessionRepository.getSessionDeletedChannel()),
                    new ChannelTopic(sessionRepository.getSessionExpiredChannel())));
    container.addMessageListener(sessionRepository,
            Collections.singletonList(new PatternTopic(sessionRepository.getSessionCreatedChannelPrefix() + "*")));
    return container;
}

利用 Redis 键空间通知机制,比如,在0号库,键 myKey 过期,会触发两个消息,等价于执行两个 publish 命令:publish __keyspace@0__:myKey expiredpublish __keyevent@0__:expired myKey

publish <channel> <message>
keyspace事件订阅端 psubscribe __keyspace@0__:mykey
keyevent事件订阅端 psubscribe __keyevent@0__:expired
this.sessionCreatedChannelPrefixBytes 是 spring:session:event:0:created: ,用于判断 channel 是否此可开头
this.expiredKeyPrefixBytes 是 spring:session:sessions:expires:,用于判断消息内容是否是此开头
this.sessionDeletedChannelBytes 是 __keyevent@0__:del,用于判断 channel 是否等于此
this.sessionExpiredChannelBytes 是 __keyevent@0__:expired,用于判断 channel 是否等于此

1) 创建 session,保存 session 后

this.sessionRedisOperations.convertAndSend 发布消息 channel 是 spring:session:event:0:created:e941ea64-3218-4304-9dec-ba346372d308,message 是 <session.delta>
RedisIndexedSessionRepository 的 onMessage() 接受消息,触发 handleCreated() 方法,发布 SessionCreatedEvent

pattern 为 spring:session:event:0:created:*
channel 为 spring:session:event:0:created:e941ea64-3218-4304-9dec-ba346372d308
message 为 <session.delta>

2) session 过期,接受消息,触发 handleExpired() 方法,发布 SessionExpiredEvent

pattern 为 __keyevent@0__:expired
channel 为 __keyevent@0__:expired
message 为 spring:session:sessions:expires:e941ea64-3218-4304-9dec-ba346372d308

3) session 删除,接受消息,触发 handleExpired() 方法,发布 SessionExpiredEvent

pattern 为 __keyevent@0__:del
channel 为 __keyevent@0__:del
message 为 spring:session:sessions:expires:e941ea64-3218-4304-9dec-ba346372d308
// RedisIndexedSessionRepository
@Override
public void onMessage(Message message, byte[] pattern) {
    byte[] messageChannel = message.getChannel();

    if (ByteUtils.startsWith(messageChannel, this.sessionCreatedChannelPrefixBytes)) {
        ...
        Map<Object, Object> loaded = (Map<Object, Object>) this.defaultSerializer.deserialize(message.getBody());
        handleCreated(loaded, new String(messageChannel));
        return;
    }

    byte[] messageBody = message.getBody();

    if (!ByteUtils.startsWith(messageBody, this.expiredKeyPrefixBytes)) {
        return;
    }

    boolean isDeleted = Arrays.equals(messageChannel, this.sessionDeletedChannelBytes);
    if (isDeleted || Arrays.equals(messageChannel, this.sessionExpiredChannelBytes)) {
        String body = new String(messageBody);
        int beginIndex = body.lastIndexOf(":") + 1;
        int endIndex = body.length();
        String sessionId = body.substring(beginIndex, endIndex);

        RedisSession session = getSession(sessionId, true);

        if (session == null) {
            return;
        }
        ...
        if (isDeleted) {
            handleDeleted(session);
        }
        else {
            handleExpired(session);
        }
    }
}