lijilei 3 vuotta sitten
vanhempi
commit
fc57e766d3

+ 22 - 0
book-push/src/main/java/com/book/push/action/PushAction.java

@@ -0,0 +1,22 @@
+package com.book.push.action;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+/**
+ * created in 2021/9/7
+ * Project: book-store
+ *
+ * @author win7
+ */
+@Component
+@Slf4j
+public class PushAction {
+
+    public void pushBySubscribe(Integer platformId, String appid, String openid) {
+        log.info("关注后智能推送:" );
+    }
+
+
+}

+ 102 - 0
book-push/src/main/java/com/book/push/config/KeyExpiredListener.java

@@ -0,0 +1,102 @@
+package com.book.push.config;
+
+import com.book.push.action.PushAction;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import redis.clients.jedis.JedisPubSub;
+
+/**
+ * created in 2021/9/7
+ * Project: book-store
+ *
+ * @author win7
+ */
+@Configuration
+@Slf4j
+public class KeyExpiredListener extends JedisPubSub {
+    public static final String KEY_PREFIX_MP_SUBSCRIBE = "mp_subscribe";
+    public static final String KEY_PREFIX_MP_SIGN = "mp_sign";
+    public static final String KEY_PREFIX_BOOK_CONTINUE_READ = "book_read";
+    public static final String KEY_PREFIX_ORDER_NOPAY = "order_nopay";
+    public static final String KEY_PREFIX_MP_NOSIGN = "mp_nosign";
+    public static final String prefix = "auto_push:";
+    @Autowired
+    private PushAction pushAction;
+
+    @Bean
+    public KeyExpiredListener getBean() {
+        return new KeyExpiredListener();
+    }
+
+    @Override
+    public void onPSubscribe(String pattern, int subscribedChannels) {
+        System.out.println("onPSubscribe " + pattern + "  " + subscribedChannels);
+    }
+
+    @Override
+    public void onPMessage(String pattern, String channel, String message) {
+        log.info("过期key:" + message);
+        //收到消息key的键值
+        if (StringUtils.isEmpty(message)) {
+            return;
+        }
+        if (!message.startsWith(prefix)) {
+            return;
+        }
+
+
+        String[] split = message.split(":");
+        String actionString = split[1];
+        Integer platformId = Integer.valueOf(split[2]);
+        String appid = split[3];
+        String openid = split[4];
+        log.info("智能推送:platformId:{},appid:{},openid:{}", platformId, appid, openid);
+        if (actionString.startsWith(KEY_PREFIX_MP_SUBSCRIBE)) {
+            pushAction.pushBySubscribe(platformId, appid, openid);
+
+        } else if (actionString.startsWith(KEY_PREFIX_MP_SIGN)) {
+
+
+        } else if (actionString.startsWith(KEY_PREFIX_BOOK_CONTINUE_READ)) {
+
+
+        } else if (actionString.startsWith(KEY_PREFIX_ORDER_NOPAY)) {
+
+
+        } else if (actionString.startsWith(KEY_PREFIX_MP_NOSIGN)) {
+
+
+        }
+    }
+
+    protected String getKey(Integer platformId, String appid, String openid, String keyPrefix) {
+        return prefix + keyPrefix + ":" + platformId + ":" + appid + ":" + openid;
+    }
+
+
+    public String getKeyMpSubscribe(Integer platformId, String appid, String openid) {
+        return getKey(platformId, appid, openid, KEY_PREFIX_MP_SUBSCRIBE);
+    }
+
+    public String getKeyMpSign(Integer platformId, String appid, String openid) {
+        return getKey(platformId, appid, openid, KEY_PREFIX_MP_SIGN);
+    }
+
+    public String getKeyPrefixBookContinueRead(Integer platformId, String appid, String openid) {
+        return getKey(platformId, appid, openid, KEY_PREFIX_BOOK_CONTINUE_READ);
+    }
+
+    public String getKeyPrefixOrderNopay(Integer platformId, String appid, String openid) {
+        return getKey(platformId, appid, openid, KEY_PREFIX_ORDER_NOPAY);
+    }
+
+    public String getKeyPrefixMpNosign(Integer platformId, String appid, String openid) {
+        return getKey(platformId, appid, openid, KEY_PREFIX_MP_NOSIGN);
+    }
+
+
+}
+

+ 28 - 22
book-push/src/main/java/com/book/push/handler/SubscribeHandler.java

@@ -8,6 +8,7 @@ import com.book.push.builder.TextBuilder;
 import com.book.push.builder.TuWenBuilder;
 import com.book.push.cons.UrlCons;
 import com.book.push.service.dao.*;
+import com.book.push.service.redis.RedisService;
 import com.book.push.utils.JsonUtils;
 import com.book.push.vo.NewsMsgContent;
 import com.book.push.vo.TextMsgContent;
@@ -43,32 +44,37 @@ public class SubscribeHandler extends AbstractHandler {
     private WechatSubscribeConfigService wechatSubscribeConfigService;
     @Autowired
     private OpHostService opHostService;
+    @Autowired
+    private RedisService redisService;
+
     @Override
     public WxMpXmlOutMessage handle(WxMpXmlMessage wxMessage,
                                     Map<String, Object> context, WxMpService weixinService,
                                     WxSessionManager sessionManager) throws WxErrorException {
 
-       log.info("新关注用户 OPENID: " + wxMessage.getFromUser());
+        log.info("新关注用户 OPENID: " + wxMessage.getFromUser());
 
         // 获取微信用户基本信息
-        WxMpUser userWxInfo ;
+        WxMpUser userWxInfo;
         String openId = wxMessage.getFromUser();
-        String appid = StringUtils.isEmpty(wxMessage.getAuthorizeAppId())?wxMessage.getToUser():wxMessage.getAuthorizeAppId();
+        String appid = StringUtils.isEmpty(wxMessage.getAuthorizeAppId()) ? wxMessage.getToUser() : wxMessage.getAuthorizeAppId();
         AdminConfig adminConfig = adminConfigService.selectByAppid(appid);
         User user = userService.selectByOpenId(openId);
         try {
-             userWxInfo = weixinService.getUserService()
+            userWxInfo = weixinService.getUserService()
                     .userInfo(wxMessage.getFromUser(), null);
             if (userWxInfo != null) {
                 // TODO 可以添加关注用户到本地数据库
 
+                if (user == null) {
+                    user = userService.insertUserByWxUser(userWxInfo, adminConfig.getAdminId());
+                }else {
+                    user = userService.updateUserByWxUser(user.getId(),userWxInfo);
+                }
 
 
-                if (user==null){
-
-                  user =   userService.createUserByWxUser(userWxInfo,adminConfig.getAdminId());
-
-                }
+                String keyMpSubscribe = redisService.getKeyListener().getKeyMpSubscribe(adminConfig.getPlatformId(), appid, openId);
+                redisService.set(keyMpSubscribe, JsonUtils.toJsonString(user),60L);
 
             }
         } catch (WxErrorException e) {
@@ -82,7 +88,7 @@ public class SubscribeHandler extends AbstractHandler {
         try {
             responseResult = this.handleSpecial(wxMessage);
         } catch (Exception e) {
-           log.error(e.getMessage(), e);
+            log.error(e.getMessage(), e);
         }
 
         if (responseResult != null) {
@@ -90,24 +96,24 @@ public class SubscribeHandler extends AbstractHandler {
         }
 
 
-
-
-
         try {
             WechatSubscribeSwitch wechatSubscribeSwitch = wechatSubscribeSwitchService.selectByAdminId(adminConfig.getAdminId());
+            if (wechatSubscribeSwitch ==null){
+                return null;
+            }
             Byte status = wechatSubscribeSwitch.getStatus();
-            if (status.intValue() == 0){
+            if (status.intValue() == 0) {
 
-                return new TextBuilder().build(String.format("@%s,感谢关注啦",user.getNickname()), wxMessage, weixinService);
+                return new TextBuilder().build(String.format("@%s,感谢关注啦", user.getNickname()), wxMessage, weixinService);
             }
-            if (wechatSubscribeSwitch.getType()==1){
-                return new TextBuilder().build(String.format("@%s,感谢关注啦",user.getNickname()), wxMessage, weixinService);
+            if (wechatSubscribeSwitch.getType() == 1) {
+                return new TextBuilder().build(String.format("@%s,感谢关注啦", user.getNickname()), wxMessage, weixinService);
             }
             WechatSubscribeConfig wechatSubscribeConfig = wechatSubscribeConfigService.selectByAdminId(adminConfig.getAdminId());
-            if (wechatSubscribeConfig!=null){
+            if (wechatSubscribeConfig != null) {
                 String host = opHostService.selectById(adminConfig.getOphostId()).getHost();
                 String type = wechatSubscribeConfig.getType().toString();
-                if ("text".equals(type)){
+                if ("text".equals(type)) {
                     String textContent = wechatSubscribeConfig.getTextContent();
                     List<TextMsgContent> textMsgContents = JsonUtils.getList(textContent, TextMsgContent.class);
                     StringBuilder sb = new StringBuilder();
@@ -121,7 +127,7 @@ public class SubscribeHandler extends AbstractHandler {
                     String msg = "@%s,欢迎关注 [%s],点击下方继续阅读 %s\n";
                     msg = String.format(msg, nickname, nick_name, sb.toString());
                     return new TextBuilder().build(msg, wxMessage, weixinService);
-                }else if ("news".equals(type)){
+                } else if ("news".equals(type)) {
                     List<NewsMsgContent> newsMsgContents = JsonUtils.getList(wechatSubscribeConfig.getNewsContent(), NewsMsgContent.class);
                     List<WxMpXmlOutNewsMessage.Item> items = new ArrayList<>();
                     for (NewsMsgContent newsMsgContent : newsMsgContents) {
@@ -137,7 +143,7 @@ public class SubscribeHandler extends AbstractHandler {
                         item.setUrl(url);
                     }
 
-                 return    new TuWenBuilder().build(items, wxMessage, weixinService);
+                    return new TuWenBuilder().build(items, wxMessage, weixinService);
 
                 }
             }
@@ -147,7 +153,7 @@ public class SubscribeHandler extends AbstractHandler {
             log.error(e.getMessage(), e);
         }
 
-           return new TextBuilder().build(String.format("@%s,感谢关注啦",user.getNickname()), wxMessage, weixinService);
+        return new TextBuilder().build(String.format("@%s,感谢关注啦", user.getNickname()), wxMessage, weixinService);
     }
 
     /**

+ 13 - 1
book-push/src/main/java/com/book/push/handler/UnsubscribeHandler.java

@@ -1,10 +1,14 @@
 package com.book.push.handler;
 
+import com.book.dao.cps.pojo.User;
+import com.book.push.service.dao.AdminConfigService;
+import com.book.push.service.dao.UserService;
 import lombok.extern.slf4j.Slf4j;
 import me.chanjar.weixin.common.session.WxSessionManager;
 import me.chanjar.weixin.mp.api.WxMpService;
 import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
 import me.chanjar.weixin.mp.bean.message.WxMpXmlOutMessage;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.Map;
@@ -15,7 +19,8 @@ import java.util.Map;
 @Slf4j
 @Component
 public class UnsubscribeHandler extends AbstractHandler {
-
+    @Autowired
+    private UserService userService;
     @Override
     public WxMpXmlOutMessage handle(WxMpXmlMessage wxMessage,
                                     Map<String, Object> context, WxMpService wxMpService,
@@ -23,6 +28,13 @@ public class UnsubscribeHandler extends AbstractHandler {
         String openId = wxMessage.getFromUser();
        log.info("取消关注用户 OPENID: " + openId);
         // TODO 可以更新本地数据库为取消关注状态
+        User user = userService.selectByOpenId(openId);
+        User updateUser = new User();
+        updateUser.setId(user.getId());
+        updateUser.setIsSubscribe("0");
+        updateUser.setIsSubscribe("0");
+
+        userService.updateUserSelective(updateUser);
         return null;
     }
 

+ 6 - 1
book-push/src/main/java/com/book/push/service/dao/UserService.java

@@ -23,8 +23,8 @@ public interface UserService {
 
     User selectById(Long id);
 
+    User insertUserByWxUser(WxMpUser wxMpUser, int channel_id);
     User createUserByWxUser(WxMpUser wxMpUser, int channel_id);
-
     AdminConfig selectAdminConfigByOpenid(String openid);
 
     List<User> selectListByPushMap(Map map);
@@ -36,4 +36,9 @@ public interface UserService {
 
 
     User createUserByOpenid(String openid, Integer channelId);
+
+    void updateUserSelective(User user);
+
+
+    User updateUserByWxUser(Long id, WxMpUser wxMpUser);
 }

+ 36 - 4
book-push/src/main/java/com/book/push/service/dao/impl/UserServiceImpl.java

@@ -48,8 +48,16 @@ public class UserServiceImpl implements UserService {
     }
 
     @Override
-    public User createUserByWxUser(WxMpUser wxMpUser, int channel_id) {
+    public User insertUserByWxUser(WxMpUser wxMpUser, int channel_id) {
+
+        User user = createUserByWxUser(wxMpUser, channel_id);
+
+        userMapper.insert(user);
+        return user;
+    }
 
+    @Override
+    public User createUserByWxUser(WxMpUser wxMpUser, int channel_id) {
         User user = new User();
         user.setOpenid(wxMpUser.getOpenId());
         user.setArea(wxMpUser.getCity());
@@ -63,15 +71,14 @@ public class UserServiceImpl implements UserService {
         user.setSubscribeTime(wxMpUser.getSubscribeTime().intValue());
         user.setOperateTime(wxMpUser.getSubscribeTime().intValue());
 //        user.setRegisterIp(wxMpUser.geti);
-        user.setSubscriptionExtend("1");
+        user.setIsSubscribe(wxMpUser.getSubscribe() ? "1" : "0");
+        user.setSubscriptionExtend("0");
         user.setProvince(wxMpUser.getProvince());
         user.setCreatetime(DateUtils.getNow());
         user.setChannelId(channel_id);
         user.setKandian(0);
         user.setFreeKandian(0);
         user.setState("1");
-
-        userMapper.insert(user);
         return user;
     }
 
@@ -172,6 +179,31 @@ public class UserServiceImpl implements UserService {
 
     }
 
+    @Override
+    public void updateUserSelective(User user) {
+        userMapper.updateByPrimaryKeySelective(user);
+    }
+
+    @Override
+    public User updateUserByWxUser(Long id,WxMpUser wxMpUser) {
+        User user = new User();
+        user.setId(id);
+        user.setOpenid(wxMpUser.getOpenId());
+        user.setArea(wxMpUser.getCity());
+        user.setCity(wxMpUser.getCity());
+        user.setCountry(wxMpUser.getCountry());
+        user.setUnionid(wxMpUser.getUnionId());
+        user.setNickname(wxMpUser.getNickname());
+        user.setSex(String.valueOf(wxMpUser.getSex()));
+        user.setAvatar(wxMpUser.getHeadImgUrl());
+        user.setSubscribeTime(wxMpUser.getSubscribeTime().intValue());
+        user.setOperateTime(wxMpUser.getSubscribeTime().intValue());
+        user.setIsSubscribe(wxMpUser.getSubscribe() ? "1" : "0");
+        user.setProvince(wxMpUser.getProvince());
+        userMapper.updateByPrimaryKeySelective(user);
+        return user;
+    }
+
     public List<Recharge> getFreeCharges(Long userId) {
         RechargeExample example = RechargeExample.newAndCreateCriteria()
                 .andUserIdEqualTo(userId)

+ 4 - 0
book-push/src/main/java/com/book/push/service/redis/RedisService.java

@@ -1,5 +1,7 @@
 package com.book.push.service.redis;
 
+import com.book.push.config.KeyExpiredListener;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,4 +34,6 @@ public interface RedisService {
 
 
     Long del(String key);
+
+    KeyExpiredListener getKeyListener();
 }

+ 11 - 0
book-push/src/main/java/com/book/push/service/redis/impl/RedisServiceImpl.java

@@ -1,7 +1,9 @@
 package com.book.push.service.redis.impl;
 
+import com.book.push.config.KeyExpiredListener;
 import com.book.push.service.redis.RedisService;
 import lombok.Cleanup;
+import lombok.Getter;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import redis.clients.jedis.Jedis;
@@ -20,9 +22,13 @@ import java.util.concurrent.TimeUnit;
  */
 @Service
 public class RedisServiceImpl implements RedisService {
+    @Autowired
+    private KeyExpiredListener keyExpiredListener;
+
     @Autowired
     private JedisPool jedisPool;
 
+
     @Override
     public void set(String key, String value) {
         @Cleanup Jedis jedis = jedisPool.getResource();
@@ -104,4 +110,9 @@ public class RedisServiceImpl implements RedisService {
         @Cleanup Jedis jedis = jedisPool.getResource();
       return jedis.del(key);
     }
+
+    @Override
+    public KeyExpiredListener getKeyListener() {
+        return keyExpiredListener;
+    }
 }

+ 35 - 0
book-push/src/main/java/com/book/push/service/redis/impl/Subscriber.java

@@ -0,0 +1,35 @@
+package com.book.push.service.redis.impl;
+
+import com.book.push.config.KeyExpiredListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+
+/**
+ * created in 2021/9/7
+ * Project: book-store
+ *
+ * @author win7
+ */
+@Component
+@Order(1)
+public class Subscriber implements CommandLineRunner {
+    private String redisTopic = "__keyevnet@0__:expired";
+
+
+    @Autowired
+    private JedisPool jedisPool;
+    @Autowired
+    private KeyExpiredListener keyExpiredListener;
+
+    @Override
+    public void run(String... args) throws Exception {
+        System.out.println("============================开启redis监听=============================================");
+        Jedis jedis = jedisPool.getResource();
+        jedis.psubscribe(keyExpiredListener, redisTopic);
+
+    }
+}