1 2 3 4 | # larger dataset with 1000 year-time slices import xarray as xr import gcsfs gcs = gcsfs.GCSFileSystem(token=None, access='read_only') |
1 | snapshots1ka = xr.open_zarr(gcs.get_mapper('gs://ldeo-glaciology/paleo_ensemble/snapshots1ka.zarr')) |
1 | # snapshots1ka # these chunks are HUGE (we usually recommend chunks around 100MB size. These are around 900!
|
1 2 3 4 | # Proposed solution # snapshots1ka = snapshots1ka.chunk(chunks={'time':1}) # chunks={'par_esia':-1, 'time':1} gives 150 MB chunks...quite large... snapshots1ka |
<xarray.Dataset> Dimensions: (time: 125, y: 381, x: 381, par_esia: 4, par_ppq: 4, par_prec: 4, par_visc: 4) Coordinates: * par_esia (par_esia) float64 1.0 2.0 4.0 7.0 * par_ppq (par_ppq) float64 0.25 0.5 0.75 1.0 * par_prec (par_prec) float64 0.02 0.05 0.07 0.1 * par_visc (par_visc) float64 1e+20 5e+20 2.5e+21 1e+22 * time (time) float64 -1.24e+05 -1.23e+05 -1.22e+05 ... -1e+03 0.0 * x (x) float64 -3.04e+06 -3.024e+06 ... 3.024e+06 3.04e+06 * y (y) float64 -3.04e+06 -3.024e+06 ... 3.024e+06 3.04e+06 Data variables: bmelt (time, y, x, par_esia, par_ppq, par_prec, par_visc) float32 dask.array<chunksize=(25, 381, 381, 1, 4, 4, 4), meta=np.ndarray> dbdt (time, y, x, par_esia, par_ppq, par_prec, par_visc) float32 dask.array<chunksize=(25, 381, 381, 1, 4, 4, 4), meta=np.ndarray> index (par_esia, par_ppq, par_prec, par_visc) int64 dask.array<chunksize=(4, 4, 4, 4), meta=np.ndarray> mask (time, y, x, par_esia, par_ppq, par_prec, par_visc) int8 dask.array<chunksize=(25, 381, 381, 1, 4, 4, 4), meta=np.ndarray> score (time, par_esia, par_ppq, par_prec, par_visc) float64 dask.array<chunksize=(125, 4, 4, 4, 4), meta=np.ndarray> thk (time, y, x, par_esia, par_ppq, par_prec, par_visc) float32 dask.array<chunksize=(25, 381, 381, 1, 4, 4, 4), meta=np.ndarray> topg (time, y, x, par_esia, par_ppq, par_prec, par_visc) float32 dask.array<chunksize=(25, 381, 381, 1, 4, 4, 4), meta=np.ndarray> usurf (time, y, x, par_esia, par_ppq, par_prec, par_visc) float32 dask.array<chunksize=(25, 381, 381, 1, 4, 4, 4), meta=np.ndarray> velbar_mag (time, y, x, par_esia, par_ppq, par_prec, par_visc) float32 dask.array<chunksize=(25, 381, 381, 1, 4, 4, 4), meta=np.ndarray> Attributes: Conventions: CF-1.5 parameter_space: {'visc': [1e+20, 5e+20, 2.5e+21, 1e+22], 'sia_e': [1.0,... proj4: +lon_0=0.0 +ellps=WGS84 +datum=WGS84 +lat_ts=-71.0 +pro... source: PISM (stable v1.0-123-gf2e24e88f committed by Torsten A...
1 2 3 4 5 | from dask.distributed import Client import dask_gateway gateway = dask_gateway.Gateway() cluster = gateway.new_cluster() |
Solution: Increase worker memory to accomodate large chunks¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | # from dask.distributed import Client # import dask_gateway # gateway = dask_gateway.Gateway() # # close existing clusters # open_clusters = gateway.list_clusters() # print(list(open_clusters)) # if len(open_clusters)>0: # for c in open_clusters: # cluster = gateway.connect(c.name) # cluster.shutdown() # # You can actually adjust the memory (and cores) of your workers! For this example we need more memory, # # since the standard worker (4GB RAM) can barely load in 4 datasets at once!) # options = gateway.cluster_options() # options.worker_memory = 16 # cluster = gateway.new_cluster(cluster_options=options) |
[ClusterReport<name=prod.ae0e1f43bb884f9db463f77f30fb460b, status=RUNNING>]
1 2 | cluster.scale(20) client = Client(cluster) |
Lets show the dashboard link for the cluster to see what is going wrong¶
1 | client
|
Client
Client-09a689b8-611d-11ed-82c5-a2e042606cc6
Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
Dashboard: /services/dask-gateway/clusters/prod.ce27cd039adb423b9995e370373c7467/status |
Cluster Info
GatewayCluster
- Name: prod.ce27cd039adb423b9995e370373c7467
- Dashboard: /services/dask-gateway/clusters/prod.ce27cd039adb423b9995e370373c7467/status
When you go shift+right click
on the link above, and select copy link
you can past that into the dask panel on the right.
1 | cellArea = snapshots1ka.x.attrs['spacing_meters'] * snapshots1ka.y.attrs['spacing_meters'] |
1 2 | # Preprocessing data V = (snapshots1ka.thk.where(snapshots1ka.mask == 2).sum(['x','y'])*cellArea)#.persist() |
1 2 | # Computing the mean volume at each time step across the ensemble Vmean = V.mean({'par_esia','par_ppq','par_prec','par_visc'}) |
1 | # Vmean_test = V.mean({'par_ppq','par_prec','par_visc'}).load()
|
1 2 3 | # Plotting mean volume # Got error Vmean.plot(size=10) |
[<matplotlib.lines.Line2D at 0x7fe250666b80>]
1 |