翼度科技»论坛 编程开发 python 查看内容

ray-分布式计算框架-集群与异步Job管理

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
0. ray 简介

ray是开源分布式计算框架,为并行处理提供计算层,用于扩展AI与Python应用程序,是ML工作负载统一工具包



  • Ray AI Runtime
ML应用程序库集


  • Ray Core
通用分布式计算库

  • Task -- Ray允许任意Python函数在单独的Python worker上运行,这些异步Python函数称为任务
  • Actor -- 从函数扩展到类,是一个有状态的工作者,当一个Actor被创建,一个新的worker被创建,并且actor的方法被安排到那个特定的worker上,并且可以访问和修改那个worker的状态
  • Object -- Task与Actor在对象上创建与计算,被称为远程对象,被存储在ray的分布式共享内存对象存储上,通过对象引用来引用远程对象。集群中每个节点都有一个对象存储,远程对象存储在何处(一个或多个节点上)与远程对象引用的持有者无关
  • Placement Groups -- 允许用户跨多个节点原子性的保留资源组,以供后续Task与Actor使用
  • Environment Dependencies --  当Ray在远程机器上执行Task或Actor时,它们的依赖环境项(Python包、本地文件、环境变量)必须可供代码运行。解决环境依赖的方式有两种,一种是在集群启动前准备好对集群的依赖,另一种是在ray的运行时环境动态安装


  • Ray cluster
一组连接到公共 Ray 头节点的工作节点,通过 kubeRay operator管理运行在k8s上的ray集群

1. ray 集群管理

ray版本:2.3.0


  • Kind 创建测试k8s集群
1主3从集群
  1. # 配置文件 -- 一主两从(默认单主),文件名:k8s-3nodes.yaml
  2. kind: Cluster
  3. apiVersion: kind.x-k8s.io/v1alpha4
  4. nodes:
  5. - role: control-plane
  6. - role: worker
  7. - role: worker
复制代码
创建k8s集群
  1. kind create cluster --config k8s-3nodes.yaml
复制代码

  • KubeRay 部署ray集群
  1. # helm方式安装
  2. # 添加Charts仓库
  3. helm repo add kuberay https://ray-project.github.io/kuberay-helm/
  4. # 安装default名称空间
  5. # 安装 kubeRay operator
  6. # 下载离线的chart包: helm pull kuberay/kuberay-operator --version 0.5.0
  7. # 本地安装: helm install kuberay-operator
  8. helm install kuberay-operator kuberay/kuberay-operator --version 0.5.0
  9. # 创建ray示例集群,若通过sdk管理则跳过
  10. # 下载离线的ray集群自定义资源:helm pull kuberay/ray-cluster  --version 0.5.0
  11. helm install raycluster kuberay/ray-cluster --version 0.5.0
  12. # 获取ray集群对应的CR
  13. kubectl get raycluster
  14. # 查询pod的状态
  15. kubectl get pods
  16. # 转发svc 8265端口到本地8265端口
  17. kubectl port-forward --address 0.0.0.0 svc/raycluster-kuberay-head-svc 8265:8265
  18. # 登录ray head节点,并执行一个job
  19. kubectl exec -it ${RAYCLUSTER_HEAD_POD} -- bash
  20. python -c "import ray; ray.init(); print(ray.cluster_resources())" # (in Ray head Pod)
  21. # 删除ray集群
  22. helm uninstall raycluster
  23. # 删除kubeRay
  24. helm uninstall kuberay-operator
  25. # 查询helm管理的资源
  26. helm ls --all-namespaces
复制代码

  • Ray 集群管理
前置要求:
  1. from python_client import kuberay_cluster_api
  2. from python_client.utils import kuberay_cluster_utils, kuberay_cluster_builder
  3. def main():
  4.    
  5.     # ray集群管理的api 获取集群列表、创建集群、更新集群、删除集群
  6.     kuberay_api = kuberay_cluster_api.RayClusterApi()
  7.     # CR 构建器,构建ray集群对应的字典格式的CR
  8.     cr_builder = kuberay_cluster_builder.ClusterBuilder()
  9.     # CR资源对象操作工具,更新cr资源
  10.     cluster_utils = kuberay_cluster_utils.ClusterUtils()
  11.     # 构建集群的CR,是一个字典对象,可以修改、删除、添加额外的属性
  12.     # 可以指定包含特定环境依赖的人ray镜像
  13.     cluster = (
  14.         cr_builder.build_meta(name="new-cluster1", labels={"demo-cluster": "yes"}) # 输入ray群名称、名称空间、资源标签、ray版本信息
  15.         .build_head(cpu_requests="0", memory_requests="0")   # ray集群head信息: ray镜像名称、对应service类型、cpu memory的requests与limits、ray head启动参数
  16.         .build_worker(group_name="workers", cpu_requests="0", memory_requests="0") # ray集群worker信息: worker组名称、 ray镜像名称、ray启动命令、cpu memory的requests与limits、默认副本个数、最大与最小副本个数
  17.         .get_cluster()
  18.     )
  19.    
  20.     # 检查CR是否构建成功
  21.     if not cr_builder.succeeded:
  22.         print("error building the cluster, aborting...")
  23.         return
  24.     # 创建ray集群
  25.     kuberay_api.create_ray_cluster(body=cluster)
  26.     # 更新ray集群CR中的worker副本集合
  27.     cluster_to_patch, succeeded = cluster_utils.update_worker_group_replicas(
  28.         cluster, group_name="workers", max_replicas=4, min_replicas=1, replicas=2
  29.     )
  30.     if succeeded:
  31.         # 更新ray集群
  32.         kuberay_api.patch_ray_cluster(
  33.             name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
  34.         )
  35.     # 在原来的集群的CR中的工作组添加新的工作组
  36.     cluster_to_patch, succeeded = cluster_utils.duplicate_worker_group(
  37.         cluster, group_name="workers", new_group_name="duplicate-workers"
  38.     )
  39.     if succeeded:
  40.         kuberay_api.patch_ray_cluster(
  41.             name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
  42.         )
  43.     # 列出所有创建的集群
  44.     kube_ray_list = kuberay_api.list_ray_clusters(k8s_namespace="default", label_selector='demo-cluster=yes')
  45.     if "items" in kube_ray_list:
  46.         for cluster in kube_ray_list["items"]:
  47.             print(cluster["metadata"]["name"], cluster["metadata"]["namespace"])
  48.     # 删除集群
  49.     if "items" in kube_ray_list:
  50.         for cluster in kube_ray_list["items"]:
  51.             print("deleting raycluster = {}".format(cluster["metadata"]["name"]))
  52.             
  53.             # 通过指定名称删除ray集群
  54.             kuberay_api.delete_ray_cluster(
  55.                 name=cluster["metadata"]["name"],
  56.                 k8s_namespace=cluster["metadata"]["namespace"],
  57.             )
  58. if __name__ == "__main__":
  59.     main()
复制代码
2. ray Job 管理

前置: pip install -U "ray[default]"


  • 创建一个job任务
  1. # 文件名称: test_job.py
  2. # python 标准库
  3. import json
  4. import ray
  5. import sys
  6. # 已经在ray节点安装的库
  7. import redis
  8. # 通过job提交时传递的模块依赖 runtime_env 配置 py_modules,通过 py_nodules传递过来就可以直接在job中导入
  9. from test_module import test_1
  10. import stk12
  11. # 创建一个连接redeis对象,通过redis作为中转向job传递输入并获取job的输出
  12. redis_cli = redis.Redis(host='192.168.6.205', port=6379,  decode_responses=True)
  13. # 通过redis获取传入过来的参数
  14. input_params_value = None
  15. if len(sys.argv) > 1:
  16.     input_params_key = sys.argv[1]
  17.     input_params_value = json.loads(redis_cli.get(input_params_key))
  18. # 执行远程任务
  19. @ray.remote
  20. def hello_world(value):
  21.     return [v + 100 for v in value]
  22. ray.init()
  23. # 输出传递过来的参数
  24. print("input_params_value:", input_params_value, type(input_params_key))
  25. # 执行远程函数
  26. result = ray.get(hello_world.remote(input_params_value))
  27. # 获取输出key
  28. output_key = input_params_key.split(":")[0] + ":output"
  29. # 将输出结果放入redis
  30. redis_cli.set(output_key, json.dumps(result))
  31. # 测试传递过来的Python依赖库是否能正常导入
  32. print(test_1.test_1())
  33. print(stk12.__dir__())
复制代码

  • 创建测试自定义模块
  1. # 模块路径: test_module/test_1.py
  2. def test_1():
  3.     return "test_1"
复制代码

  • 创建一个job提交对象
[code]import jsonfrom ray.job_submission import JobSubmissionClient, JobStatusimport timeimport uuidimport redis# 上传un到ray集群供job使用的模块import test_modulefrom agi import stk12# 创建一个连接redeis对象redis_cli = redis.Redis(host='192.168.6.205', port=6379,  decode_responses=True)# 创建一个client,指定远程ray集群的head地址client = JobSubmissionClient("http://127.0.0.1:8265")# 创建任务的IDid = uuid.uuid4().hexinput_params_key = f"{id}:input"input_params_value = [1, 2, 3, 4, 5]# 将输入参数存入redis,供远程函数job使用redis_cli.set(input_params_key, json.dumps(input_params_value))# 提交一个ray job 是一个独立的ray应用程序job_id = client.submit_job(    # 执行该job的入口脚本    entrypoint=f"python test_job.py {input_params_key}",    # 将本地文件上传到ray集群    runtime_env={        "working_dir": "./",        "py_modules": [test_module, stk12],        "env_vars": {"testenv": "test-1"}    },    # 自定义任务ID    submission_id=f"{id}")# 输出job IDprint("job_id:", job_id)def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):    """轮询获取Job的状态,当完成时获取任务的的日志输出"""    start = time.time()    while time.time() - start

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具