tianyun 1 rok temu
rodzic
commit
87f776c171
8 zmienionych plików z 307 dodań i 73 usunięć
  1. 52 43
      flink.py
  2. 16 14
      sh/tmp.sh
  3. 2 6
      test_py_invoke.py
  4. 33 2
      tmp.py
  5. 56 7
      tmp1.py
  6. 34 0
      tmp2.py
  7. 1 0
      tmp3.py
  8. 113 1
      tmp4.py

+ 52 - 43
flink.py

@@ -1,5 +1,5 @@
 import datetime
-import json
+import unittest
 
 import requests
 
@@ -8,12 +8,13 @@ class MyFlink:
     def __init__(self, host):
         self.host = host
 
-    # 返回id数组
-    def list_job1(self):
+    # 返回所有 job id
+    def list_job_id(self):
         url = self.host + "/jobs"
         response = requests.get(url)
         return [job["id"] for job in response.json()['jobs']]
 
+    # 查看所有jar包
     def list_jobs(self):
         url = self.host + "/jobs/overview"
         response = requests.get(url)
@@ -37,22 +38,27 @@ class MyFlink:
             job_details.append(job_detail)
         return job_details
 
+    # 停止所有job
     def stop_all(self):
-        url = self.host + "/jobs/stop"
-        response = requests.post(url)
-        return response.json()
+        job_ids = self.list_job_id()
+        for job_id in job_ids:
+            self.stop_by_id(job_id)
 
+    # 根据job id 停止任务
     def stop_by_id(self, job_id):
         url = self.host + f"/jobs/{job_id}/stop"
         response = requests.post(url)
         return response.json()
 
+    # 上传jar包
     def upload_jar(self, jar_path):
         url = self.host + "/jars/upload"
-        files = {'file': open(jar_path, 'rb')}
-        response = requests.post(url, files=files)
-        if response.json()['status'] == 'success':
-            return ["filename"].split("/")[-1]
+        with open(jar_path, 'rb') as jar_file:
+            files = {'file': jar_file}
+            response = requests.post(url, files=files, timeout=1000)
+            print(response.json())
+            if response.json()['status'] == 'success':
+                return response.json()["filename"].split("/")[-1]
 
     def list_jars(self):
         url = self.host + "/jars"
@@ -64,40 +70,43 @@ class MyFlink:
         response = requests.delete(url)
         return response.json()
 
-    # 示例
-    # [
-    #     "--input", "/path/to/input/file",
-    #     "--output", "/path/to/output/file",
-    # ]
     def start_job(self, jar_id, program_args):
-        url = f"{self.host}/jars/{jar_id}/plan"
-        response = requests.get(url)
-        entry_class = response.json()["entryClass"]
-
-        url = self.host + f"/jars/${jar_id}/run"
-        json_data = {
-            "entryClass": entry_class,
-            "programArgs": program_args
-        }
-        response = requests.post(url, json=json_data)
-        return response.json()
-
-
-def print_json_pretty(obj):
-    print(json.dumps(obj, indent=4, ensure_ascii=False))
+        program_args = " ".join(program_args.strip().split())
+        print(f"program_args 入参:{program_args}")
+        jars = self.list_jars()
+        for jar in jars:
+            if jar['id'] == jar_id:
+                entry_class = jar['entry'][0]['name']
+                url = self.host + f"/jars/{jar_id}/run"
+                json_data = {
+                    "entryClass": entry_class,
+                    "programArgs": program_args
+                }
+                response = requests.post(url, json=json_data)
+                return response.json()
+        return "未找到jar包"
+
+
+class MyTest(unittest.TestCase):
+
+    def test_stop_all(self):
+        flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
+        print(flink.stop_all())
+
+    def test_upload_start(self):
+        flink = MyFlink("https://flink.rxdptest.k5.bigtree.tech")
+        jar_id = flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar")
+        print(f"jar_id: {jar_id}")
+        jar_id = "b33b6c52-0e68-4f11-b6af-aeb7ffb116bf_rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar"
+        print(flink.start_job(jar_id, """
+        --kafkaServer kafka-0.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-1.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092,kafka-2.kafka-headless.rxdptest.svc.k5.bigtree.zone:9092
+        --esServer elasticsearch-master.rxdptest.svc.k5.bigtree.zone
+        --esPort 9200 
+        --pythonUrl http://py-invoke-svc:8000
+        --redisIp redis-master.rxdptest.svc.k5.bigtree.zone
+        --redisPassword SB6vdem
+        """))
 
 
 if __name__ == '__main__':
-    # flink = MyFlink("https://flink.rxdpdev.k5.bigtree.tech")
-
-    # print(flink.upload_jar("/Users/alvin/bigtree/rxdp-dm-jobs/rxdp-dm-jobs-flink/target/rxdp-dm-jobs-flink-1.0-SNAPSHOT.jar"))
-    # print_json_pretty(flink.list_jars())
-    flink = MyFlink("http://121.37.84.96:8082")
-    jars = flink.list_jars()
-    # print(jars)
-    # for jar in jars:
-    #     jar_id = jar['id']
-    #     if "SNAPSHOT" not in jar_id:
-    #         print(jar_id)
-    #         print(flink.delete_jar(jar_id))
-    # print(flink.delete_jar("e9d0ec97-bcfd-4501-848e-23be742c5038_main.jar"))
+    pass

+ 16 - 14
sh/tmp.sh

@@ -1,16 +1,18 @@
-getNamespace() {
-  read "namespace?aircpdev aircptest aircpsl
-rxdpdev rxdptest rxdpsl: "
-  if [ "${namespace}" ]; then
-    echo "${namespace}"
-  else
-    echo "rxdpdev"
-  fi
-}
+#!/bin/bash
 
+# 定义.zshrc文件路径
+ZSHRC="$HOME/.zshrc"
 
-funmv(){
-   newfile=`echo $1 | sed "s/$2/$3/g"`
-   mv $1 $newfile
-}
-funmv
+# 检查是否存在KUBECONFIG行
+if grep -q "export KUBECONFIG" $ZSHRC; then
+    # 存在,检查是否被注释
+    if grep -q "^#export KUBECONFIG" $ZSHRC; then
+        # 是注释状态,去掉注释
+        sed -i '' '/^#export KUBECONFIG/s/^#//' $ZSHRC
+    else
+        # 不是注释状态,加上注释
+        sed -i '' '/^export KUBECONFIG/s/^/#/' $ZSHRC
+    fi
+else
+    echo "export KUBECONFIG行不存在"
+fi

+ 2 - 6
test_py_invoke.py

@@ -12,14 +12,10 @@ def exec_def(cmd_str, obj):
 # ================[ 自定义部分 Start ]===============
 exec_str = """
 def exec(obj):
-    import json
-    obj["a"] = 123
     return obj
 """
-obj = {"b": 45}
+obj = {"b": 45, 'zScore': None}
 # ================[ 自定义部分 End ]===============
 
 
-exec_def(exec_str, obj)
-
-
+print(exec_def(exec_str, obj))

+ 33 - 2
tmp.py

@@ -1,3 +1,34 @@
-import time
+# 导入必要的库
+import pandas as pd
+from catboost import CatBoostClassifier, Pool
+from sklearn.model_selection import train_test_split
 
-print(time.time())
+# 加载示例数据集
+data = pd.read_csv('mushroom.csv')
+X = pd.get_dummies(data.drop('class', axis=1))
+y = data['class']
+X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
+
+# 定义CatBoost分类器并训练模型
+model = CatBoostClassifier(iterations=100, depth=2, learning_rate=0.1, loss_function='Logloss')
+model.fit(X_train, y_train, verbose=False)
+
+# 评估模型性能
+print('Train accuracy:', model.score(X_train, y_train))
+print('Test accuracy:', model.score(X_test, y_test))
+
+# 保存模型
+model.save_model('catboost_model.bin')
+
+# 加载模型
+loaded_model = CatBoostClassifier()
+loaded_model.load_model('catboost_model.bin')
+
+# X_test = [['x', 's', 'n', 't', 'p', 'f', 'c', 'n', 'k', 'e', 'e', 's', 's', 'w', 'w', 'p', 'w', 'o', 'p', 'k', 's', 'u']]
+# 使用模型进行预测
+preds_class = loaded_model.predict(X_test)
+preds_proba = loaded_model.predict_proba(X_test)
+
+# 输出预测结果
+print('Predicted classes:', preds_class)
+print('Predicted probabilities:', preds_proba)

+ 56 - 7
tmp1.py

@@ -1,9 +1,58 @@
-def print_time(code_str, number=1):
-    import timeit
-    sum_result = timeit.timeit(code_str, number=number, globals=globals(), setup='pass')
-    sum_result *= 1000
-    print(f"执行时间: {int(sum_result)} 毫秒")
+import pandas as pd
+from catboost import CatBoostClassifier, Pool
+from sklearn.model_selection import train_test_split
 
 
-a = 123
-print_time("print(a)")
+def train_model(data_path, model_path):
+    # 加载数据集
+    data = pd.read_csv(data_path)
+    print(data)
+    X = pd.get_dummies(data.drop('class', axis=1))
+    y = data['class']
+
+    # 划分训练集和测试集
+    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
+
+    # 定义CatBoost分类器并训练模型
+    model = CatBoostClassifier(iterations=100, depth=2, learning_rate=0.1, loss_function='Logloss')
+    model.fit(X_train, y_train, verbose=False)
+
+    # 保存模型
+    model.save_model(model_path)
+
+    # 返回训练好的模型
+    return model, list(X.columns)
+
+
+def predict(model_path, input_data, input_columns):
+    # 加载模型
+    loaded_model = CatBoostClassifier()
+    loaded_model.load_model(model_path)
+
+    # 将输入数据转换为DataFrame格式
+    input_df = pd.DataFrame(input_data, columns=input_columns)
+    input_df = pd.get_dummies(input_df)
+
+    # 使用模型进行预测
+    preds_class = loaded_model.predict(input_df)
+    preds_proba = loaded_model.predict_proba(input_df)
+
+    # 返回预测结果
+    return preds_class, preds_proba
+
+
+data_path = 'mushroom.csv'
+model_path = 'catboost_model.bin'
+
+# 训练模型
+trained_model, input_columns = train_model(data_path, model_path)
+
+# 输入数据示例
+input_data = [['x', 's', 'n', 't', 'p', 'f', 'c', 'n', 'k', 'e', 'e', 's', 's', 'w', 'w', 'p', 'w', 'o', 'p', 'k', 's', 'u']]
+
+# 进行预测
+preds_class, preds_proba = predict(model_path, input_data, input_columns)
+
+# 输出预测结果
+print('Predicted classes:', preds_class)
+print('Predicted probabilities:', preds_proba)

+ 34 - 0
tmp2.py

@@ -0,0 +1,34 @@
+# 创建Elasticsearch客户端
+
+import requests
+import json
+
+# Elasticsearch地址
+base_url = 'http://elasticsearch-master.rxdpdev.svc.k5.bigtree.zone:9200'
+
+
+# 插入一条数据
+def insert_data(id, name):
+    url = f'{base_url}/rxdp_tag_all/_doc/{id}'
+    doc = {
+        'id': id,
+        'name': name
+    }
+    headers = {'Content-Type': 'application/json'}
+    response = requests.put(url, data=json.dumps(doc), headers=headers)
+    # print(response.json())
+
+
+# 根据ID查询数据
+def get_data_by_id(id):
+    url = f'{base_url}/rxdp_tag_all/_doc/{id}'
+    headers = {'Content-Type': 'application/json'}
+    response = requests.get(url, headers=headers)
+    print(response.json()['_source'])
+
+
+# 调用插入数据函数
+insert_data(2, 'John Doe')
+
+# 调用根据ID查询数据函数
+get_data_by_id(2)

+ 1 - 0
tmp3.py

@@ -0,0 +1 @@
+123

+ 113 - 1
tmp4.py

@@ -1 +1,113 @@
-123
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import sessionmaker
+
+
+class SqlEngine:
+    def __init__(self, mysql_dict):
+        from urllib.parse import quote_plus
+        # 280秒重新连接一次
+        self.DBSession = sessionmaker(bind=create_engine(
+            f"mysql://{mysql_dict['user']}:{quote_plus(mysql_dict['password'])}@{mysql_dict['host']}:{mysql_dict['port']}/{mysql_dict['db']}?charset=utf8mb4&serverTimezone=Asia/Shanghai",
+            pool_size=2, max_overflow=0, pool_recycle=280))
+
+    def insert(self, item):
+        with self.DBSession() as se:
+            se.add(item)
+            se.commit()
+            return item.id
+
+    def insert_all(self, items):
+        with self.DBSession() as se:
+            se.bulk_save_objects(items)
+            se.commit()
+
+    def select_all(self, model, filters=None, order_bys=None, offset=None, limit=None):
+        """
+        :param limit:
+        :param offset:
+        :param order_bys:
+                order_by(User.name.desc()
+        :param model:
+        :param filters:
+            User.name == 'James'
+            User.name != 'James'
+            User.name.like('%e%')
+            User.name.in_(['Kobe', 'James'])
+        :return:
+        """
+        with self.DBSession() as se:
+            query = self.get_query(filters, se, model, order_bys, offset, limit)
+            return query.all()
+
+    def select_page(self, model, filters, order_bys=None, offset=0, limit=10):
+        count = self.query_count(model, filters)
+        res_list = self.query_all(model, filters, order_bys, offset, limit)
+        return {
+            "count": count,
+            "list": res_list
+        }
+
+    def select_count(self, model, filters=None):
+        with self.DBSession() as se:
+            query = self.get_query(filters, se, model)
+            return query.count()
+
+    def select_first(self, model, filters=None):
+        with self.DBSession() as se:
+            query = self.get_query(filters, se, model)
+            return query.first()
+
+    def update(self, model, filters, dict_new):
+        with self.DBSession() as se:
+            query = self.get_query(filters, se, model)
+            res = query.update(dict_new)
+            se.commit()
+            return res
+
+    @staticmethod
+    def get_query(filters, se, model, order_bys=None, offset=None, limit=None):
+        if filters is None:
+            filters = []
+        if order_bys is None:
+            order_bys = []
+        query = se.query(model)
+        for _filter in filters:
+            query = query.filter(_filter)
+        if len(order_bys) > 0:
+            query = query.order_by(*order_bys)
+        if offset:
+            query = query.offset(offset)
+        if limit:
+            query = query.limit(limit)
+        return query
+
+    def delete(self, model, filters):
+        with self.DBSession() as se:
+            query = self.get_query(filters, se, model)
+            res = query.delete()
+            se.commit()
+            return res
+
+    def select_sql(self, sql, params=None):
+        """
+        sql 查询
+        :param sql: "SELECT * FROM users WHERE age > :age"
+        :param params: {'age': 18}
+        :return:
+        """
+        with self.DBSession() as se:
+            if params:
+                stmt = text(sql).bindparams(**params)
+                result = se.execute(stmt)
+            else:
+                result = se.execute(sql)
+        return result.fetchall()
+
+    def execute_sql(self, sql, params=None):
+        with self.DBSession() as se:
+            if params:
+                stmt = text(sql).bindparams(**params)
+                result = se.execute(stmt)
+            else:
+                result = se.execute(sql)
+            return result.rowcount