import copy
from collections.abc import Sequence
from typing import Optional
import numpy as np
import xarray
from scipy.constants import mu_0
from shapely import Polygon, Point
import pleque
from pleque.utils.decorators import deprecated, ordered_path_scalar_function, scalar_function, vector_function
from scipy.interpolate import RectBivariateSpline, UnivariateSpline
from pleque.core import Coordinates
from pleque.utils.tools import arglis, xp_sections
from pleque.core import FluxFunctions, Surface # , FluxSurface
from pleque.core import SurfaceFunctions
from pleque.core import cocos as cc
import pleque.utils.equi_tools as eq_tools
import pleque.utils.surfaces as surf
import pleque.utils.flux_expansions as flux_expansion
from pleque.utils.surfaces import track_plasma_boundary
from pleque.config.settings import get_settings
settings = get_settings()
[docs]
class Equilibrium(object):
"""
Equilibrium class ...
"""
@staticmethod
def _shape_spline_result(value, grid=False):
"""
Convert SciPy spline grid output to PLEQUE's public grid layout.
RectBivariateSpline returns true grids as (n_R, n_Z). PLEQUE exposes
grid-shaped data as (n_Z, n_R), matching np.meshgrid(R, Z). For
grid=False SciPy evaluates elementwise and already preserves the input
array shape, so no transpose is applied.
"""
return value.T if grid else value
# def __init__(self,
# basedata: xarray.Dataset,
# first_wall=None: Iterable[(float, float)],
# psi_lcfs=None: float,
# X_points=None: Iterable[(float, float)],
# strike_points=None: Iterable[(float, float)],
# spline_order=5: int,
# cocos=3: int,
# ):
[docs]
def __init__(self,
basedata: xarray.Dataset,
first_wall=None,
mg_axis=None,
psi_lcfs=None,
x_points=None,
strike_points=None,
init_method="hints",
spline_order=3,
spline_smooth=0,
find_extremes_order=20,
cocos=None,
verbose=False,
):
"""
Equilibrium class instance should be obtained generally by functions in pleque.io
package.
Optional arguments may help the initialization.
:param basedata: xarray.Dataset with psi(R, Z) on a rectangular R, Z grid, f(psi_norm), p(psi_norm)
f = B_tor * R
:param first_wall: array-like (Nwall, 2) required for initialization in case of limiter configuration.
:param mg_axis: suspected position of the o-point
:param psi_lcfs:
:param x_points:
:param strike_points:
:param init_method: str On of ("full", "hints", "fast_forward").
If "full" no hints are taken and module tries to recognize all critical points itself.
If "hints" module use given optional arguments as a help with initialization.
If "fast-forward" module use given optional arguments as final and doesn't try to correct.
*Note:* Only "hints" method is currently tested.
:param spline_order:
:param spline_smooth:
:param find_extremes_order: Define number of points on internal grid used for identifying
magnetic axis and x-points.
:param cocos: At the moment module assume cocos to be 3 (no other option). The implemetnation is not fully
working. Be aware of signs in the module!
:param verbose:
"""
if verbose:
print('---------------------------------')
print('Equilibrium module initialization')
print('---------------------------------')
if cocos is None:
if "cocos" in basedata.attrs:
cocos = basedata.attrs["cocos"]
elif "cocos" in basedata:
cocos = basedata["cocos"]
else:
cocos = 3
self._basedata = basedata
self._verbose = verbose
self._mg_axis = mg_axis
self._psi_lcfs = psi_lcfs
self._x_points = x_points
self._strike_points = strike_points
self._spline_order = spline_order
# TODO TODO TODO
self._init_method = init_method
self._cocos = cocos
self._cocosdic = cc.cocos_coefs(cocos)
# todo: resolve this from input (for COCOS time) TODO TODO TODO
self._Bpol_sign = 1
if self._verbose:
print(f"Equilibrium initialized with cocos={self._cocos} and init_method={self._init_method}")
self._flux_surfaces = None
try:
r = basedata.R.values
z = basedata.Z.values
psi = basedata.psi.transpose('R', 'Z').values
if first_wall is None:
if 'first_wall' in basedata:
self._first_wall = basedata["first_wall"].values
elif 'R_first_wall' in basedata and 'Z_first_wall' in basedata:
self._first_wall = np.array([basedata.R_first_wall.values,
basedata.Z_first_wall.values]).T
elif 'r_lim' in basedata and 'z_lim' in basedata:
self._first_wall = np.array([basedata.r_lim.values,
basedata.z_lim.values]).T
else:
rwall_min = np.min(r)
rwall_max = np.max(r)
zwall_min = np.min(z)
zwall_max = np.max(z)
dr = rwall_max - rwall_min
dz = zwall_max - zwall_min
# todo: remove this if possible
# lets reduce the wall a bit to be have some plasma behind the wall
rwall_min += dr / 100
rwall_max -= dr / 100
zwall_min += dz / 100
zwall_max -= dz / 100
corners = np.array(
[[rwall_min, zwall_max], [rwall_max, zwall_max], [rwall_max, zwall_min],
[rwall_min, zwall_min]])
newwall_r = []
newwall_z = []
for i in range(-1, 3):
rs = np.linspace(corners[i, 0], corners[i + 1, 0], 20)
zs = np.linspace(corners[i, 1], corners[i + 1, 1], 20)
newwall_r += list(rs)
newwall_z += list(zs)
self._first_wall = np.stack((newwall_r, newwall_z)).T
else:
self._first_wall = first_wall
self._first_wall = self._first_wall[~np.isnan(self._first_wall).any(axis=1)]
if 'time' in basedata:
self.time = basedata['time'].values
else:
self.time = -1
if 'time_unit' in basedata:
self.time_unit = basedata['time_unit']
else:
self.time_unit = "ms"
if 'shot' in basedata:
# Shot number will be strictly integer
self.shot = int(basedata['shot'])
else:
self.shot = 0
# todo: other machine-related information
self.R_min = np.min(r)
self.R_max = np.max(r)
self.Z_min = np.min(z)
self.Z_max = np.max(z)
# TODO: allow FFprime, ffprime, pprime and other on the input
psi_n = basedata.psi_n.values
pressure = None
pprime = None
if 'pprime' in basedata:
pprime = basedata.pprime.values
if 'pressure' in basedata:
pressure = basedata.pressure.values
self.F0 = None
# Try to find F0 in basedata:
if 'F0' in basedata:
self.F0 = basedata['F0']
if isinstance(self.F0, xarray.DataArray):
self.F0 = np.asarray(self.F0.values).item()
elif 'F0' in basedata.attrs:
self.F0 = basedata.attrs['F0']
F = None
FFprime = None
if 'FFprime' in basedata:
FFprime = basedata.FFprime.values
if 'F' in basedata:
F = basedata.F.values
# Other attempts to identify F0:
if self.F0 is None:
if F is not None:
self.F0 = F[-1]
elif 'B0' in basedata and 'R0' in basedata:
self.F0 = basedata['B0'] * basedata['R0']
elif 'B0' in basedata.attrs and 'R0' in basedata.attrs:
self.F0 = basedata.attrs['B0'] * basedata.attrs['R0']
# ---------------------------
# --- Generate psi spline ---
# ---------------------------
if verbose:
print('--- Generate 2D spline ---')
spl = RectBivariateSpline(r, z, psi, kx=spline_order, ky=spline_order,
s=spline_smooth)
self._spl_psi = spl
# -------------------------------
# ---- Find critical points -----
# -------------------------------
if verbose:
print('--- Looking for critical points ---')
x_points, o_points = eq_tools.find_extremes(r, z, self._spl_psi, order=find_extremes_order)
# TODO: Raise warning if no o_point was found!
r_lim = (self.R_min, self.R_max)
z_lim = (self.Z_min, self.Z_max)
self._mg_axis, sortidx = eq_tools.recognize_mg_axis(o_points, self._spl_psi, r_lim, z_lim,
first_wall=self._first_wall,
mg_axis_candidate=self._mg_axis)
self._psi_axis = np.asarray(self._spl_psi(self._mg_axis[0], self._mg_axis[1], grid=False)).item()
self._o_points = o_points[sortidx]
self._o_points[0] = self._mg_axis
# ------------------------------------------
# Recognize x-point plasma vs limiter plasma
# ------------------------------------------
if verbose:
print('--- Recognizing equilibrium type ---')
# todo: use these two x-points in the future
(xp1, xp2), sortidx = eq_tools.recognize_x_points(x_points, self._mg_axis, self._psi_axis,
self._spl_psi,
r_lim, z_lim, self._psi_lcfs, self._x_points)
self._x_point = xp1
self._x_point2 = xp2
if xp1 is None:
self._psi_xp = None
else:
self._psi_xp = self._spl_psi(*xp1, grid=False)
self._x_points = x_points[sortidx]
if xp1 is not None:
self._x_points[0] = xp1
if xp2 is not None:
self._x_points[1] = xp2
limiter_plasma, limiter_point = eq_tools.recognize_plasma_type(self._x_point, self._first_wall,
self._mg_axis, self._psi_axis,
self._spl_psi)
self._limiter_plasma = limiter_plasma
self._limiter_point = limiter_point
if self._verbose:
if limiter_plasma:
print(">> Limiter plasma found.")
else:
print(">> X-point plasma found.")
self._psi_lcfs = self._spl_psi(*limiter_point, grid=False)
# -----------------------
# --- Plasma boundary ---
# -----------------------
rs = np.linspace(self.R_min, self.R_max, 700)
zs = np.linspace(self.Z_min, self.Z_max, 1200)
if limiter_plasma:
self._strike_points = self._limiter_point[np.newaxis, :]
self._contact_point = self._limiter_point
else:
self._contact_point = None
if len(self._first_wall) < 4:
self._strike_points = None
else:
self._strike_points = eq_tools.find_strike_points(self._spl_psi, rs, zs, self._psi_lcfs,
self._first_wall)
if self._verbose:
print("--- Looking for LCFS: ---")
# sometimes this close_lcfs is empty - investigate!
close_lcfs = eq_tools.find_close_lcfs(self._psi_lcfs, rs, zs, self._spl_psi,
self._mg_axis, self._psi_axis)
while surf.fluxsurf_error(self._spl_psi, close_lcfs, self._psi_lcfs) > 1e-10:
close_lcfs = eq_tools.find_surface_step(self._spl_psi, self._psi_lcfs, close_lcfs)
if self._verbose:
print("Relative LCFS error: {}".format(
surf.fluxsurf_error(self._spl_psi, close_lcfs, self._psi_lcfs)))
if not limiter_plasma:
close_lcfs = surf.add_xpoint(xp1, close_lcfs, self._mg_axis)
self._lcfs = close_lcfs
# generate 1d profiles:
if self._psi_lcfs - self._psi_axis > 0:
self._psi_sign = +1
else:
self._psi_sign = -1
Fprime = None
if FFprime is not None:
F = eq_tools.ffprime2f(FFprime, self._psi_axis, self._psi_lcfs, self.F0)
Fprime = FFprime / F
if pprime is not None:
pressure = eq_tools.pprime2p(pprime, self._psi_axis, self._psi_lcfs)
self.BvacR = self.F0
# if p and F are not define, run vacuum-like discharge:
self._vacuum = False
if pressure is None or F is None:
pressure = np.zeros_like(psi_n)
F = np.zeros_like(psi_n)
self._vacuum = True
if verbose:
print('--- Generate 1D splines ---')
self._fpol_spl = UnivariateSpline(psi_n, F, k=3, s=0)
if FFprime is None:
self._df_dpsin_spl = self._fpol_spl.derivative()
Fprime = self._df_dpsin_spl(psi_n) / (self._psi_lcfs - self._psi_axis)
FFprime = F * Fprime
self._pressure_spl = UnivariateSpline(psi_n, pressure, k=3, s=0)
if pprime is None:
self._dp_dpsin_spl = self._pressure_spl.derivative()
pprime = self._dp_dpsin_spl(psi_n) / (self._psi_lcfs - self._psi_axis)
self._pprime_spl = UnivariateSpline(psi_n, pprime, k=3, s=0)
self._Fprime_spl = UnivariateSpline(psi_n, Fprime, k=3, s=0)
self._FFprime_spl = UnivariateSpline(psi_n, FFprime, k=3, s=0)
self.fluxfuncs.add_flux_func('F', F, psi_n=psi_n)
self.fluxfuncs.add_flux_func('FFprime', FFprime, psi_n=psi_n)
self.fluxfuncs.add_flux_func('pressure', pressure, psi_n=psi_n)
self.fluxfuncs.add_flux_func('pprime', pprime, psi_n=psi_n)
if verbose:
print('--- Mapping midplane to psi_n ---')
self._map_midplane2psi()
if verbose:
print('--- Mapping pressure and f func to psi_n ---')
except:
from pleque.utils.plotting import _plot_debug
import matplotlib.pyplot as plt
plt.figure()
_plot_debug(self)
plt.show()
raise
[docs]
@scalar_function
def psi(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
"""
Psi value
:param psi_n:
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
return coord.psi
[docs]
@vector_function(ndim=2)
def nabla_psi(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=False, **coords) -> np.ndarray:
r"""
Return the value of :math:`\nabla \psi`.
:return: Array of shape (2, ...) containing the gradient components [dψ/dR, dψ/dZ].
If grid=True, the shape will be (2, nZ, nR).
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
dpsi_dr = self._shape_spline_result(self._spl_psi(coord.R, coord.Z, grid=coord.grid, dx=1), coord.grid)
dpsi_dz = self._shape_spline_result(self._spl_psi(coord.R, coord.Z, grid=coord.grid, dy=1), coord.grid)
nabla_psi = np.stack((dpsi_dr, dpsi_dz))
return nabla_psi
[docs]
@scalar_function
def diff_psi(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=False, **coords):
r"""
Return the value of :math:`\pm|\nabla \psi|`. It is positive/negative if the :math:`\psi` is increasing/decreasing.
:param coordinates:
:param R:
:param Z:
:param psi_n:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
ret = np.sqrt(self._spl_psi(coord.R, coord.Z, grid=coord.grid, dx=1) ** 2 +
self._spl_psi(coord.R, coord.Z, grid=coord.grid, dy=1) ** 2)
ret *= np.sign(self._psi_lcfs - self._psi_axis)
if coord.grid:
ret = ret.T
return ret
[docs]
@scalar_function
def psi_n(self, *coordinates, R=None, Z=None, psi=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi=psi, coord_type=coord_type, grid=grid, **coords)
return coord.psi_n
[docs]
@scalar_function
def r_mid(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
# todo: map the r_mid to psi_n
return self._rmid_spl(coord.psi)
@property
def _diff_psi_n(self):
"""
psi_2 - psi_1 = (psi_n_2 - psi_n_1)*1/_diff_psi_n
:return: Scaling parameter between psi_n and psi
"""
return 1 / (self._psi_lcfs - self._psi_axis)
[docs]
@scalar_function
def rho(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
return np.sqrt(coord.psi_n)
[docs]
@scalar_function
def pressure(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
return self._pressure_spl(coord.psi_n)
[docs]
@scalar_function
def pprime(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
return self._pprime_spl(coord.psi_n)
[docs]
@scalar_function
def f(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
return self.F(coord) / mu_0
[docs]
@scalar_function
def F(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
# todo use in_plasma
mask_out = coord.psi_n > 1
F = self._fpol_spl(coord.psi_n)
F[mask_out] = self.BvacR
return F
[docs]
@scalar_function
def Fprime(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
'''
:param coordinates:
:param R:
:param Z:
:param psi_n:
:param coord_type:
:param grid:
:param coords:
:return:
'''
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
mask_out = coord.psi_n > 1
Fprime = self._Fprime_spl(coord.psi_n)
Fprime[mask_out] = 0
return Fprime
[docs]
@scalar_function
def ffprime(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
return self.FFprime(coord) / mu_0 ** 2
[docs]
@scalar_function
def FFprime(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=True, **coords):
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
mask_out = coord.psi_n > 1
FFprime = self._fpol_spl(coord.psi_n) * self._Fprime_spl(coord.psi_n)
FFprime[mask_out] = 0
return FFprime
[docs]
@scalar_function
def B_abs(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
"""
Absolute value of magnetic field in Tesla.
:param grid:
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param coords:
:return: Absolute value of magnetic field in Tesla.
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
B_R = self.B_R(coord)
B_Z = self.B_Z(coord)
B_T = self.B_tor(coord)
B_abs = np.sqrt(B_R ** 2 + B_Z ** 2 + B_T ** 2)
return B_abs
[docs]
@vector_function(ndim=3)
def Bvec(self, *coordinates, swap_order=False, R=None, Z=None, coord_type=None, grid=True, **coords):
""" Magnetic field vector
:param grid:
:param coordinates:
:param swap_order: If False, return component-first shape ``(3, ...)``.
If True, move the component axis to the end for compatibility.
:param R:
:param Z:
:param coord_type:
:param coords:
:return: Magnetic field vector array. Component-first shape is ``(3, n_elements)``
for paired points and ``(3, n_z, n_r)`` for grids.
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
bR = self.B_R(coord)
bz = self.B_Z(coord)
btor = self.B_tor(coord)
bvec = np.stack((bR, bz, btor))
if not swap_order:
return bvec
else:
return np.moveaxis(bvec, 0, -1)
[docs]
@vector_function(ndim=3)
def Bvec_norm(self, *coordinates, swap_order=False, R=None, Z=None, coord_type=None, grid=True, **coords):
""" Magnetic field vector, normalised
:param grid:
:param coordinates:
:param swap_order: If False, return component-first shape ``(3, ...)``.
If True, move the component axis to the end for compatibility.
:param R:
:param Z:
:param coord_type:
:param coords:
:return: Normalised magnetic field vector array. Component-first shape is
``(3, n_elements)`` for paired points and ``(3, n_z, n_r)`` for grids.
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
bR = self.B_R(coord)
bz = self.B_Z(coord)
btor = self.B_tor(coord)
bvec = np.stack((bR, bz, btor))
bvec_n = bvec / np.linalg.norm(bvec, axis=0)
if not swap_order:
return bvec_n
else:
return np.moveaxis(bvec_n, 0, -1)
# XXXXXX TODO TODO TODO
[docs]
@deprecated('The structure and behaviour of this function will change soon!\n'
'to keep the same behaviour use `_flux_surface` instead.')
def flux_surface(self, *coordinates, resolution=(1e-3, 1e-3), dim="step",
closed=True, inlcfs=True, R=None, Z=None, psi_n=None,
coord_type=None, **coords):
return self._flux_surface(*coordinates, resolution=resolution, dim=dim,
closed=closed, inlcfs=inlcfs, R=R, Z=Z, psi_n=psi_n,
coord_type=coord_type, **coords)
def _flux_surface(self, *coordinates, resolution=None, dim="step",
closed=None, inlcfs=True, R=None, Z=None, psi_n=None,
coord_type=None, **coords):
"""
Function which finds flux surfaces with requested values of psi or psi-normalized. Specification of the
fluxsurface properties as if it is inside last closed flux surface or if the surface is supposed to be
closed are possible.
:param R:
:param Z:
:param psi_n:
:param coord_type:
:param coordinates: specifies flux surface to search for (by spatial point or values of psi or psi normalised).
If coordinates is spatial point (dim=2) then parameters closed and lcfs are automatically overridden.
Coordinates.grid must be False.
:param resolution: Iterable of size 2 or a number, default is [1e-3, 1e-3]. If a number is passed,
R and Z dimensions will have the same size or step (depending on dim parameter). Different R and Z
resolutions or dimension sizes can be required by passing an iterable of size 2
:param dim: iterable of size 2 or string. Default is "step", determines the meaning of the resolution.
If "step" used, values in resolution are interpreted as step length in psi poloidal map. If "size" is used,
values in resolution are interpreted as requested number of points in a dimension. If string is passed,
same value is used for R and Z dimension. Different interpretation of resolution for R, Z dimensions can be
achieved by passing an iterable of shape 2.
:param closed: Are we looking for a closed surface. This parameter is ignored of inlcfs is True.
:param inlcfs: If True only the surface inside the last closed flux surface is returned.
:return: list of FluxSurface objects
"""
coordinates = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, **coords)
# get the grid for psi map to find the contour in.
# todo: this is, at the moment, slowest part of the code
grid = self.grid(resolution=resolution, dim=dim)
# todo: to get lcfs, here is small trick. This should be handled better
# otherwise it may return crossed loop
if np.isclose(coordinates.psi_n[0], 1) and inlcfs:
psi_n = 1 - 1e-5
else:
psi_n = coordinates.psi_n[0]
# create coordinates
# coords = self.coordinates(R=R, Z=Z, grid=True, coord_type=["R", "Z"])
# get the coordinates of the contours with requested leve and convert them into
# instances of FluxSurface class
contour = self._get_surface(grid, level=psi_n, norm=True)
for i in range(len(contour)):
contour[i] = self._as_fluxsurface(contour[i])
# get the position of the magnetic axis, which is used to determine whether the found fluxsurfaces are
# within the lcfs
magaxis = self.coordinates(np.expand_dims(self._mg_axis, axis=0))
# find fluxsurfaces with requested parameters
fluxsurface = []
if coordinates.dim == 1:
for i in range(len(contour)):
if inlcfs and contour[i].closed and contour[i].contains(magaxis):
fluxsurface.append(contour[i])
return fluxsurface
# generally this logic is quite odd; however, if we specify closed=None, it should add the contour
# whatsoever
elif not inlcfs and closed is None:
fluxsurface.append(contour[i])
elif not inlcfs and (closed is True) and contour[i].closed:
fluxsurface.append(contour[i])
elif not inlcfs and (closed is False) and not contour[i].closed:
fluxsurface.append(contour[i])
elif coordinates.dim == 2:
# Sadly contour des not go through the point due to mesh resolution :-(
dist = np.inf
tmp2 = None
for i in range(len(contour)):
tmp = contour[i].distance(coordinates)
if tmp < dist:
dist = tmp
tmp2 = contour[i]
fluxsurface.append(tmp2)
return fluxsurface
[docs]
@scalar_function
def poloidal_mag_flux_exp_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
r"""
**Poloidal magnetic flux expansion coefficient.**
**Definition:**
.. math::
f_\mathrm{pol} = \frac{\Delta r^\mathrm{t}}{\Delta r^\mathrm{u}} =
\frac{B_\theta^\mathrm{u} R^\mathrm{u}}{B_\theta^\mathrm{t} R^\mathrm{t}}
**Typical usage:**
*Poloidal magnetic flux expansion coefficient* is typically used for :math:`\lambda` scaling
in plane perpendicular to the poloidal component of the magnetic field.
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return flux_expansion.poloidal_mag_flux_exp_coef(self, coords)
[docs]
@ordered_path_scalar_function
def effective_poloidal_mag_flux_exp_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
r"""
**Effective poloidal magnetic flux expansion coefficient**
**Definition:**
.. math::
f_\mathrm{pol, eff} = \frac{B_\theta^\mathrm{u} R^\mathrm{u}}{B_\theta^\mathrm{t} R^\mathrm{t}}
\frac{1}{\sin \beta} = \frac{f_\mathrm{pol}}{\sin \beta}
Where :math:`\beta` is inclination angle of the poloidal magnetic field and the target plane.
**Typical usage:**
*Effective magnetic flux expansion coefficient* is typically used for :math:`\lambda` scaling
of the target :math:`\lambda` with respect to the upstream value.
.. math::
\lambda^\mathrm{t} = \lambda^\mathrm{u} f_{\mathrm{pol, eff}}
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return flux_expansion.effective_poloidal_mag_flux_exp_coef(self, coords)
[docs]
@scalar_function
def poloidal_heat_flux_exp_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
r"""
**Poloidal heat flux expansion coefficient**
**Definition:**
.. math::
f_\mathrm{pol, heat} = \frac{B_\theta^\mathrm{u}}{B_\theta^\mathrm{t}}
**Typical usage:**
*Poloidal heat flux expansion coefficient* is typically used to scale poloidal heat flux
(heat flux projected along poloidal magnetic field) along the magnetic field line.
.. math::
q_\theta^\mathrm{t} = \frac{q_\theta^\mathrm{u}}{f_{\mathrm{pol, heat}}}
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return flux_expansion.poloidal_heat_flux_exp_coef(self, coords)
[docs]
@ordered_path_scalar_function
def effective_poloidal_heat_flux_exp_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
r"""
**Effective poloidal heat flux expansion coefficient**
**Definition:**
.. math::
f_\mathrm{pol, heat, eff} = \frac{B_\theta^\mathrm{u}}{B_\theta^\mathrm{t}}
\frac{1}{\sin \beta} = \frac{f_\mathrm{pol}}{\sin \beta}
Where :math:`\beta` is inclination angle of the poloidal magnetic field and the target plane.
**Typical usage:**
*Effective poloidal heat flux expansion coefficient* is typically used scale upstream poloidal
heat flux to the target plane.
.. math::
q_\perp^\mathrm{t} = \frac{q_\theta^\mathrm{u}}{f_{\mathrm{pol, heat, eff}}}
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return flux_expansion.effective_poloidal_heat_flux_exp_coef(self, coords)
[docs]
@scalar_function
def parallel_heat_flux_exp_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
r"""
**Parallel heat flux expansion coefficient**
**Definition:**
.. math::
f_\parallel= \frac{B^\mathrm{u}}{B^\mathrm{t}}
**Typical usage:**
*Parallel heat flux expansion coefficient* is typically used to scale total upstream heat flux
parallel to the magnetic field along the magnetic field lines.
.. math::
q_\parallel^\mathrm{t} = \frac{q_\parallel^\mathrm{u}}{f_\parallel}
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return flux_expansion.parallel_heat_flux_exp_coef(self, coords)
[docs]
@ordered_path_scalar_function
def total_heat_flux_exp_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
r"""
**Total heat flux expansion coefficient**
**Definition:**
.. math::
f_\mathrm{tot} = \frac{B^\mathrm{u}}{B^\mathrm{t}} \frac{1}{\sin \alpha} =
\frac{f_\parallel}{\sin \alpha}
Where :math:`\alpha` is inclination angle of the total magnetic field and the target plane.
**Typical usage:**
*Total heat flux expansion coefficient* is typically used to project total upstream heat flux
parallel to the magnetic field to the target plane.
.. math::
q_\perp^\mathrm{t} = \frac{q_\parallel^\mathrm{u}}{f_{\mathrm{tot}}}
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return flux_expansion.total_heat_flux_exp_coef(self, coords)
[docs]
@deprecated('This function was not written correctly and with the state of the knowlige and'
'nobody is allowed to use it! It has been or will be replaced by the new functions.')
def outter_parallel_fl_expansion_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
"""
WIP:Calculate parallel expansion coefitient of the given coordinates with respect to positon on the outer
midplane.
"""
target = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
# print(coord.r_mid)
B_midplane = self.B_abs(r=target.r_mid, theta=np.zeros_like(target.r_mid), grid=False)
B_coord = self.B_abs(target)
return B_coord / B_midplane
[docs]
@deprecated('This function was not written correctly and with the state of the knowlige and'
'nobody is allowed to use it! It has been or will be replaced by the new functions.')
def outter_poloidal_fl_expansion_coef(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
"""
WIP:Calculate parallel expansion coefitient of the given coordinates with respect to positon on the outer
midplane.
"""
target = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
# print(coord.r_mid)
B_midplane = self.B_pol(r=target.r_mid, theta=np.zeros_like(target.r_mid), grid=False)
B_coord = self.B_pol(target)
return B_coord / B_midplane
def _get_surface(self, *coordinates, R=None, Z=None, level=0.5, norm=True, coord_type=None, **coords):
"""
finds contours
:return: list of coordinates of contours on a requested level
"""
from pleque.utils.surfaces import find_contour
coordinates = self.coordinates(*coordinates, R=R, Z=Z, grid=True, coord_type=coord_type, **coords)
if norm:
contour = find_contour(coordinates.psi_n, level=level, r=coordinates.R, z=coordinates.Z)
else:
contour = find_contour(coordinates.psi, level=level, r=coordinates.R, z=coordinates.Z)
for i in range(len(contour)):
contour[i] = Coordinates(self, contour[i])
return contour
[docs]
def plot_geometry(self, axs=None, **kwargs):
"""
Plots the the directions of angles, current and magnetic field.
:param axs: None or tuple of axes. If None new figure with to axes is created.
:param kwargs: parameters passed to the `plot` routine.
:return: tuple of axis (ax1, ax2)
"""
import matplotlib.pyplot as plt
if axs is None:
fig, axs = plt.subplots(1, 2)
fw = self.first_wall
if len(fw) < 4:
print('Warning: first wall is not sufficient. LCFS is used instead.')
fw = self.lcfs
R_min = np.min(fw.R)
R_max = np.max(fw.R)
R_mid = (R_min + R_max) / 2
sig_ip = np.sign(self.I_plasma)
sig_bt = np.sign(self.F0)
#############
# Top view: #
#############
ax1 = axs[0]
ax1.set_title('Top view')
phis = np.linspace(0, 2 * np.pi, endpoint=True)
ax1.plot(R_min * np.cos(phis), R_min * (np.sin(phis)), 'k-')
ax1.plot(R_max * np.cos(phis), R_max * (np.sin(phis)), 'k-')
phi_dir = self.coordinates(R=R_mid, Z=0, phi=np.pi / 8)
phi_dir_mg = self.coordinates(R=R_mid, Z=0, phi=np.pi + sig_bt * np.pi / 8)
phi_dir_ip = self.coordinates(R=R_mid, Z=0, phi=3 * np.pi / 2 + sig_ip * np.pi / 8)
ax1.arrow(R_mid, 0, phi_dir.X[0] - R_mid, phi_dir.Y[0],
width=0.005, length_includes_head=True, head_width=0.05)
ax1.text(R_mid + R_mid / 14, 0, r'$\phi$',
ha='left', va='center')
ax1.arrow(-R_mid, 0, phi_dir_mg.X[0] + R_mid, phi_dir_mg.Y[0],
width=0.005, length_includes_head=True, head_width=0.05)
ax1.text(- (R_mid + R_mid / 14), 0, r'$B_\phi$',
ha='right', va='center')
ax1.arrow(0, -R_mid, phi_dir_ip.X[0], phi_dir_ip.Y[0] + R_mid,
width=0.005, length_includes_head=True, head_width=0.05)
ax1.text(0, - (R_mid + R_mid / 14), r'$j_\phi$',
ha='center', va='top')
ax1.set_aspect('equal')
ax1.set_xlabel('X [m]')
ax1.set_ylabel('Y [m]')
###########################
# Poloidal cross section: #
###########################
r0 = (R_max - self.magnetic_axis.R[0]) * 0.6
pos0 = self.coordinates(r=r0, theta=0)
theta0 = 0
theta_dir = self.coordinates(r=r0, theta=theta0 + np.pi / 8)
r1 = (self.magnetic_axis.R[0] - R_min) * 0.7
r2 = (self.magnetic_axis.R[0] - R_min) * 0.45
theta1 = theta2 = np.pi
pos1 = self.coordinates(r=r1, theta=theta1)
pos2 = self.coordinates(r=r2, theta=theta2)
sig_theta = self._cocosdic['sigma_pol'] * self._cocosdic['sigma_cyl']
sig_bpol = sig_theta * np.sign(self.B_Z(pos1))
sig_jpol = sig_theta * np.sign(self.j_Z(pos2))
theta_dir_bpol = self.coordinates(r=pos1.r[0], theta=pos1.theta[0] + sig_bpol * np.pi / 8)
theta_dir_jpol = self.coordinates(r=pos2.r[0], theta=pos2.theta[0] + sig_jpol * np.pi / 8)
ax2 = axs[1]
ax2.set_title("Poloidal cross section")
ax2.plot(fw.R, fw.Z, 'k-')
if fw is not self.lcfs:
ax2.plot(self.lcfs.R, self.lcfs.Z, 'C0--')
ax2.plot(self.magnetic_axis.R, self.magnetic_axis.Z, 'C0o')
ax2.arrow(pos0.R[0], pos0.Z[0], theta_dir.R[0] - pos0.R[0], theta_dir.Z[0] - pos0.Z[0],
width=0.005, head_width=0.03)
ax2.text(pos0.R[0] + r0 / 14, pos0.Z[0], r'$\theta$',
ha='left', va='center')
ax2.arrow(pos1.R[0], pos1.Z[0], theta_dir_bpol.R[0] - pos1.R[0], theta_dir_bpol.Z[0] - pos1.Z[0],
width=0.005, head_width=0.03)
ax2.text(pos1.R[0] - r0 / 14, pos1.Z[0], r'$B_\theta$',
ha='right', va='center')
ax2.arrow(pos2.R[0], pos2.Z[0], theta_dir_jpol.R[0] - pos2.R[0], theta_dir_jpol.Z[0] - pos2.Z[0],
width=0.005, head_width=0.03)
ax2.text(pos2.R[0] + r0 / 14, pos2.Z[0], r'$j_\theta$',
ha='left', va='center')
ax2.set_aspect('equal')
ax2.set_xlabel('R [m]')
ax2.yaxis.set_label_position("right")
ax2.set_ylabel('Z [m]')
return axs
[docs]
def plot_overview(self, ax=None, **kwargs):
"""
Simple routine for plot of plasma overview
:return:
"""
return self._plot_overview(ax, **kwargs)
def _plot_overview(self, ax=None, **kwargs):
"""
Simple routine for plot of plasma overview
:return:
"""
from pleque.utils.plotting import plot_equilibrium
# plt.figure()
return plot_equilibrium(self, ax=ax, **kwargs)
[docs]
def invert_equilibrium(self, vertically: bool = False, first_wall: bool = False, toroidal_field: bool = False,
current: bool = False) -> 'Equilibrium':
""" Return a new instance of the `pleque.Equilibrium` with opposite value of selected property/quantity.
Parameters
----------
vertically : bool, optional
If True, invert the psi map vertically (flip along Z axis). Default is False.
first_wall : bool, optional
If True, invert the first wall vertically. Default is False.
toroidal_field : bool, optional
If True, invert the toroidal field. Default is False.
current : bool, optional
If True, invert the current. Default is False.
Note
----
To study the effect of the inverted equilibrium to the upper divertor, to keep the (un)favourable drifts,
one has to invert all `vertically`, `toroidal_field` and `current` quantities. `
Returns
-------
pleque.Equilibrium
A new instance of Equilibrium with inverted properties.
"""
# Create a deep copy of the basedata
new_basedata = copy.deepcopy(self._basedata)
# Invert psi map vertically if requested
if vertically:
# Flip the psi values along the Z dimension using xarray indexing
new_z = - new_basedata.Z.values[::-1]
new_basedata.coords["Z"] = xarray.DataArray(new_z, dims=("Z",))
new_psi = new_basedata.psi.transpose("R", "Z").values[:, ::-1]
new_basedata["psi"] = xarray.DataArray(new_psi, dims=("R", "Z"))
# Handle the first wall inversion if requested
new_first_wall = None
if first_wall and hasattr(self, '_first_wall') and self._first_wall is not None:
# Create a copy of the first wall
new_first_wall = np.copy(self._first_wall)
# Invert the Z coordinates (second column)
z_min = self._basedata.Z.min().item()
z_max = self._basedata.Z.max().item()
new_first_wall[:, 1] = z_max + z_min - new_first_wall[:, 1]
# Invert toroidal field if requested
if toroidal_field:
# The toroidal field is determined by the F function and F0
# Invert the F function in the basedata before creating the new Equilibrium
if 'F' in new_basedata:
new_basedata['F'] = -new_basedata['F']
# Also invert F0 if it exists
if 'F0' in new_basedata:
new_basedata['F0'] = -new_basedata['F0']
elif 'F0' in new_basedata.attrs:
new_basedata.attrs['F0'] = -new_basedata.attrs['F0']
# if "FFprime" in new_basedata:
# new_basedata["FFprime"] = -new_basedata["FFprime"]
# Invert current if requested
if current:
# The current is determined by the derivatives of psi and F
# Invert the pprime and FFprime functions if they exist
new_basedata["psi"] = -new_basedata["psi"]
if 'pprime' in new_basedata:
new_basedata['pprime'] = -new_basedata['pprime']
if 'FFprime' in new_basedata:
new_basedata['FFprime'] = -new_basedata['FFprime']
# Create a new Equilibrium with the modified basedata
new_equi = Equilibrium(
basedata=new_basedata,
first_wall=new_first_wall if first_wall else self._first_wall,
mg_axis=self._mg_axis,
psi_lcfs=self._psi_lcfs,
x_points=self._x_points,
strike_points=self._strike_points,
init_method=self._init_method,
spline_order=self._spline_order,
cocos=self._cocos,
verbose=self._verbose
)
return new_equi
[docs]
def grid(self, resolution=None, dim="step"):
"""
Function which returns 2d grid with requested step/dimensions generated over the reconstruction space.
:param resolution: Iterable of size 2 or a number. If a number is passed,
R and Z dimensions will have the same size or step (depending on dim parameter). Different R and Z
resolutions or dimension sizes can be required by passing an iterable of size 2.
If None, default grid of size (1000, 2000) is returned.
:param dim: iterable of size 2 or string ('step', 'size'). Default is "step", determines the meaning
of the resolution.
If "step" used, values in resolution are interpreted as step length in psi poloidal map. If "size" is used,
values in resolution are interpreted as requested number of points in a dimension. If string is passed,
same value is used for R and Z dimension. Different interpretation of resolution for R, Z dimensions can be
achieved by passing an iterable of shape 2.
:return: Instance of `Coordinates` class with grid data
"""
if resolution is None:
if not hasattr(self, '_default_grid'):
# TODO THIS is slow now. Decrease resolution and then use find_fluxsurface_step (!!!)
R = np.linspace(self._basedata.R.min().item(), self._basedata.R.max().item(), 1000)
Z = np.linspace(self._basedata.Z.min().item(), self._basedata.Z.max().item(), 2000)
self._default_grid = self.coordinates(R=R, Z=Z, grid=True)
return self._default_grid
else:
if isinstance(resolution, Sequence):
if not len(resolution) == 2:
raise ValueError("if iterable, resolution has to be of size 2")
res_R = resolution[0]
res_Z = resolution[1]
else:
res_R = resolution
res_Z = resolution
if isinstance(dim, Sequence) and len(dim) == 2:
if res_R is None:
R = self._basedata.R.values
elif dim[0] == "step":
R = np.arange(self._basedata.R.min().item(), self._basedata.R.max().item(), res_R)
elif dim[0] == "size":
R = np.linspace(self._basedata.R.min().item(), self._basedata.R.max().item(), res_Z)
else:
raise ValueError("Wrong dim[0] value passed")
if res_Z is None:
Z = self._basedata.Z.values
elif dim[1] == "step":
Z = np.arange(self._basedata.Z.min().item(), self._basedata.R.max().item(), res_R)
elif dim[1] == "size":
Z = np.linspace(self._basedata.Z.min().item(), self._basedata.Z.max().item(), res_Z)
else:
raise ValueError("Wrong dim[1] value passed")
elif isinstance(dim, str):
if dim == "step":
if res_R is None:
R = self._basedata.R.values
else:
R = np.arange(self._basedata.R.min().item(), self._basedata.R.max().item(), res_R)
if res_Z is None:
Z = self._basedata.Z.values
else:
Z = np.arange(self._basedata.Z.min().item(), self._basedata.Z.max().item(), res_Z)
elif dim == "size":
if res_R is None:
R = self._basedata.R.values
else:
R = np.linspace(self._basedata.R.min().item(), self._basedata.R.max().item(), res_R)
if res_Z is None:
Z = self._basedata.Z.values
else:
Z = np.linspace(self._basedata.Z.min().item(), self._basedata.Z.max().item(), res_Z)
else:
raise ValueError("Wrong dim value passed")
else:
raise ValueError("Wrong dim value passed")
coords = self.coordinates(R=R, Z=Z, grid=True)
return coords
[docs]
@scalar_function
def B_R(self, *coordinates, R=None, Z=None, coord_type=('R', 'Z'), grid=True, **coords):
"""
Poloidal value of magnetic field in Tesla.
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return: Scalar array with shape ``(n_elements,)`` for paired points and
``(n_z, n_r)`` for grids.
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
cc_norm = self._cocosdic["sigma_cyl"] * self._cocosdic["sigma_Bp"] * 1 / (2 * np.pi) ** self._cocosdic["exp_Bp"]
dpsi_dz = self._shape_spline_result(self._spl_psi(coord.R, coord.Z, dy=1, grid=coord.grid), coord.grid)
R = self._shape_spline_result(coord.R, coord.grid)
return cc_norm * dpsi_dz / R * self._Bpol_sign
[docs]
def B_R_rz(self, R, Z):
cc_norm = self._cocosdic["sigma_cyl"] * self._cocosdic["sigma_Bp"] * 1 / (2 * np.pi) ** self._cocosdic["exp_Bp"]
return cc_norm * self._spl_psi(R, Z, dy=1, grid=False) / R * self._Bpol_sign
[docs]
@scalar_function
def B_Z(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
"""
Poloidal value of magnetic field in Tesla.
:param grid:
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param coords:
:return: Scalar array with shape ``(n_elements,)`` for paired points and
``(n_z, n_r)`` for grids.
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
cc_norm = self._cocosdic["sigma_cyl"] * self._cocosdic["sigma_Bp"] * 1 / (2 * np.pi) ** self._cocosdic["exp_Bp"]
dpsi_dr = self._shape_spline_result(self._spl_psi(coord.R, coord.Z, dx=1, grid=coord.grid), coord.grid)
R = self._shape_spline_result(coord.R, coord.grid)
return - cc_norm * dpsi_dr / R * self._Bpol_sign
[docs]
def B_Z_rz(self, R, Z):
cc_norm = self._cocosdic["sigma_cyl"] * self._cocosdic["sigma_Bp"] * 1 / (2 * np.pi) ** self._cocosdic["exp_Bp"]
return - cc_norm * self._spl_psi(R, Z, dx=1, grid=False) / R * self._Bpol_sign
[docs]
@scalar_function
def B_pol(self, *coordinates, R=None, Z=None, coord_type=None, grid=True, **coords):
"""
Absolute value of magnetic field in Tesla.
:param grid:
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param coords:
:return:
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
B_R = self.B_R(coord)
B_Z = self.B_Z(coord)
B_pol = np.sqrt(B_R ** 2 + B_Z ** 2)
return B_pol
[docs]
@scalar_function
def B_tor(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=True, **coords):
"""
Toroidal value of magnetic field in Tesla.
:param grid:
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param coords:
:return:
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return self.F(coord) / coord.R
[docs]
def B_tor_rz(self, R, Z):
psi = self._spl_psi(R, Z, grid=False)
psi_n = (psi - self._psi_axis) / (self._psi_lcfs - self._psi_axis)
mask_out = psi_n > 1
F = self._fpol_spl(psi_n)
F[mask_out] = self.BvacR
return F / R
[docs]
@scalar_function
def abs_q(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=False, **coords):
"""
Absolute value of q.
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
return np.abs(self.q(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords))
[docs]
@scalar_function
def q(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=False, **coords):
if not hasattr(self, '_q_spl'):
self._init_q()
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return self._q_spl(coord.psi_n)
[docs]
@scalar_function
def diff_q(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=False, **coords):
"""
:param self:
:param coordinates:
:param R:
:param Z:
:param psi_n:
:param coord_type:
:param grid:
:param coords:
:return: Derivative of q with respect to psi.
"""
if not hasattr(self, '_dq_dpsin_spl'):
self._init_q()
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
return self._dq_dpsin_spl(coord.psi_n) * self._diff_psi_n
[docs]
@scalar_function
def shear(self, *coordinates, R=None, Z=None, psi_n=None, coord_type=None, grid=False, **coords):
r"""Normalized magnetic shear parameter
.. math::
\hat s = \frac{r_\mathrm{mid}}{q}\frac{\mathrm{d}q}{\mathrm{d}r}
where r_\mathrm{mid} is a plasma radius on the midplane.
"""
coord = self.coordinates(*coordinates, R=R, Z=Z, psi_n=psi_n, coord_type=coord_type, grid=grid, **coords)
q = self.q(coord)
dq_dpsi = self.diff_q(coord)
dpsi_dr = self.diff_psi(coord)
s = coord.r_mid / q * dq_dpsi * dpsi_dr
return s
[docs]
@scalar_function
def pol_flux(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=False, **coords):
"""
Return poloidal flux in Wb which is not normalized by 2pi defiend as:
.. math::
\psi_\mathrm{pol} = - \rho_{Bp} \int B \mathrm{d}S
the result is obtained by normalization of the reference poloidal flux `psi`
with respect to the current value of the COCOS:
.. math::
\psi_\mathrm{pol} = (2 \pi)^{1 - e_{Bp}} \psi_\mathrm{ref}
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
cc = 2 * np.pi if self._cocosdic['exp_Bp'] == 0 else 1
return cc * self.psi(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
[docs]
@scalar_function
def tor_flux(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=False, **coords):
"""
Calculate toroidal magnetic flux :math:`\Phi` from:
.. math::
q = \frac{\mathrm{d \Phi} }{\mathrm{d \psi}}
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
if not hasattr(self, '_q_anideriv_spl'):
self._init_q()
cc = self._cocosdic['sigma_Bp'] * self._cocosdic['sigma_pol'] * ((2 * np.pi) ** (1 - self._cocosdic['exp_Bp']))
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return cc * self._q_anideriv_spl(coord.psi_n) * (1 / self._diff_psi_n)
[docs]
@scalar_function
def j_R(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=True, **coords):
# todo test cocos here!
# todo: test grid
# todo: test test test
from scipy.constants import mu_0
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
cc = self._cocosdic['sigma_cyl']
dpsi_dZ = self._spl_psi(coord.R, coord.Z, grid=coord.grid, dy=1)
if coord.grid:
dpsi_dZ = dpsi_dZ.T
return - cc * self.Fprime(coord) / (coord.R * mu_0) * dpsi_dZ
[docs]
@scalar_function
def j_Z(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=True, **coords):
from scipy.constants import mu_0
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
cc = self._cocosdic['sigma_cyl']
dpsi_dR = self._spl_psi(coord.R, coord.Z, grid=coord.grid, dx=1)
if coord.grid:
dpsi_dR = dpsi_dR.T
return cc * self.Fprime(coord) / (coord.R * mu_0) * dpsi_dR
[docs]
@scalar_function
def j_pol(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=False, **coords):
r"""
Poloidal component of the current density.
Calculated as
.. math::
\frac{f' \nabla \psi }{R \mu_0}
[Wesson: Tokamaks, p. 105]
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
from scipy.constants import mu_0
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
return self.Fprime(coord) / (coord.R * mu_0) * self.diff_psi(coord)
[docs]
@scalar_function
def j_tor(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=True, **coords):
r"""
todo: to be tested
Toroidal component of the current denisity.
Calculated as
.. math::
R p' + \frac{1}{\mu_0 R} ff'
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param grid:
:param coords:
:return:
"""
from scipy.constants import mu_0
coord = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
cc_norm = - self._cocosdic["sigma_Bp"] * (2 * np.pi) ** self._cocosdic["exp_Bp"]
return cc_norm * (coord.R * self.pprime(coord) + 1 / (mu_0 * coord.R) * self.FFprime(coord))
[docs]
def get_precise_lcfs(self):
"""
Calculate plasma LCFS by field line tracing technique and save LCFS as
instance property.
:return:
"""
from pleque.utils.surfaces import track_plasma_boundary
lcfs = track_plasma_boundary(self, self._x_point)
# todo debug this :) after this doesn't work eq.plot_overview()
# self._lcfs = lcfs
return lcfs
@property
def lcfs(self):
if not hasattr(self, '_lcfs_fl'):
if not surf.curve_is_closed(self._lcfs):
self._lcfs = np.vstack((self._lcfs, self._lcfs[0]))
self._lcfs_fl = self._as_fluxsurface(self._lcfs)
return self._lcfs_fl
@property
def separatrix(self):
"""
If the equilibrium is limited, returns lcfs. If it is diverted it returns separatrix flux surface
:return:
"""
if not self._limiter_plasma:
if not hasattr(self, '_separatrix'):
self._find_separatrix()
return self._as_fluxsurface(self._separatrix)
else:
return self.lcfs
def _find_separatrix(self):
"""
Finds separatrix contour by finding contour with normalized poloidal flux going to 1 from right....
The proceedure iterates 1+0.000001*counter and stops when the found contour has intersection points with the
first wall contour.
:return:
"""
found = False
cnt = 0
# todo: This should be rewritten
while not found and cnt < 101:
psi_n = 1 + 1e-6 * cnt
cnt += 1
separatrix = self._flux_surface(inlcfs=False, closed=None, psi_n=psi_n)
for flux_surf in separatrix:
# todo: this is not separatrix... for example in limiter plasma and without first wall
intersection = self.first_wall.intersection(flux_surf)
# The behaviour of intersetion function changed. This condition should catch both empty list and None:
if intersection and len(intersection) > 0:
poly = Polygon(flux_surf._string)
o_point = Point(self.magnetic_axis.R[0], self.magnetic_axis.Z[0])
if poly.contains(o_point):
self._separatrix = flux_surf.as_array(("R", "Z"))
found = True
break
return self._separatrix
@property
def contact_point(self):
"""
Returns contact point as instance of coordinates for circular plasmas. Returns None otherwise.
:return:
"""
if self._contact_point is None:
return None
else:
return self.coordinates(self._contact_point[0], self._contact_point[1])
@property
def strike_points(self):
"""
Returns contact point if the equilibrium is limited. If the equilibrium is diverted it returns strike points.
:return:
"""
if self._strike_points is None:
return None # This should not happen if the wall consists of enough points
else:
return self.coordinates(self._strike_points[:, 0], self._strike_points[:, 1])
@property
def limiter_point(self):
"""
The point which "limits" the LCFS of plasma. I.e. contact point in case of limiter plasma and x-point
in case of x-point plasma.
:return: Coordinates
"""
return self.coordinates(self._limiter_point[0], self._limiter_point[1])
@property
def x_point(self) -> Optional[Coordinates]:
"""
Return x-point closest in psi to mg-axis if presented on grid. None otherwise.
:return Coordinates
"""
if self._x_point is None:
return None
else:
return self.coordinates(*self._x_point)
@property
def secondary_x_point(self) -> Optional[Coordinates]:
"""
Returns the second closest (in units of psi) x-point to magnetic axis. None otherwise.
"""
if self._x_point2 is None:
return None
else:
return self.coordinates(*self._x_point2)
@property
def first_wall(self):
"""
If the first wall polygon is composed of 3 and more points Surface instance is returned.
If the wall contour is composed of less than 3 points, coordinate instance is returned, because Surface can't
be constructed
:return:
"""
if self._first_wall.shape[0] < 3:
return self.coordinates(self._first_wall)
else:
first_wall = self._first_wall
# first wall should be a closed contour
# todo: this should be chacked in init
if not surf.curve_is_closed(first_wall):
first_wall = np.concatenate((first_wall, first_wall[0, :][None, :]), axis=0)
return Surface(self, first_wall)
@property
def magnetic_axis(self):
return self.coordinates(self._mg_axis[0], self._mg_axis[1])
@property
def geometrical_axis(self):
"""
Geometrical axis of the plasma defined as
:math
R_{ax} = (max(R_{lcfs}) + min(R_{lcfs})) / 2
Z_{ax} = (max(Z_{lcfs}) + min(Z_{lcfs})) / 2
"""
r_ax = (self.lcfs.R.max() + self.lcfs.R.min()) / 2
z_ax = (self.lcfs.Z.max() + self.lcfs.Z.min()) / 2
return self.coordinates(r_ax, z_ax)
@property
def I_plasma(self):
"""
Toroidal plasma current. Calculated as toroidal current through the LCFS.
:return: (float) Value of toroidal plasma current.
"""
if not hasattr(self, "_Ip"):
self._Ip = self.lcfs.tor_current
return self._Ip
[docs]
def xp_section(self, length: float = 0.15) -> tuple["Coordinates"]:
"""
Return poloidal cross-sections of the planes of the x-point section (Σ_s).
Args:
length (float): Length around the X-point for computing sections. Defaults to 0.15.
Returns:
tuple[Coordinates]: Tuple of `Coordinates` objects representing each plane.
The order of x-point planes directions (with respect to x-point) (lfs, in-plasma, hfs, out) is preserved.
"""
secs = xp_sections(self._spl_psi, self._x_point[0], self._x_point[1], length=length)
secs_coords = tuple((self.coordinates(sec) for sec in secs))
return secs_coords
[docs]
def coordinates(self, *coordinates, coord_type=None, grid=False, **coords):
"""
Return instance of Coordinates. If instances of coordinates is already on the input, just pass it through.
:param coordinates:
:param coord_type:
:param grid:
:param coords:
:return:
"""
if len(coordinates) >= 1 and isinstance(coordinates[0], Coordinates):
return coordinates[0]
else:
return Coordinates(self, *coordinates, coord_type=coord_type, grid=grid, **coords)
def _as_fluxsurface(self, *coordinates, coord_type=None, grid=False, **coords):
"""
:param coordinates:
:param coord_type:
:param grid:
:param coords:
:return:
"""
from pleque import FluxSurface
if len(coordinates) >= 1 and isinstance(coordinates[0], FluxSurface):
return coordinates[0]
elif len(coordinates) >= 1 and isinstance(coordinates[0], Coordinates):
coord = coordinates[0]
return FluxSurface(self, coord.R, coord.Z)
else:
return FluxSurface(self, *coordinates, coord_type=coord_type, grid=grid, **coords)
[docs]
def in_first_wall(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=True, **coords):
from pleque.utils.surfaces import points_inside_curve
# if grid:
# r_mesh, z_mesh = np.meshgrid(R, Z)
# points = np.vstack((r_mesh.ravel(), z_mesh.ravel())).T
# else:
# points = np.vstack((R, Z)).T
# mask_in = point_in_first_wall(self, points)
# return mask_in
points = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
mask_in = points_inside_curve(points.as_array(), self._first_wall)
if points.grid:
mask_in = mask_in.reshape(len(points.x2), len(points.x1))
return mask_in
[docs]
def in_lcfs(self, *coordinates, R: np.array = None, Z: np.array = None, coord_type=None, grid=True, **coords):
from pleque.utils.surfaces import points_inside_curve
# if grid:
# r_mesh, z_mesh = np.meshgrid(R, Z)
# points = np.vstack((r_mesh.ravel(), z_mesh.ravel())).T
# else:
# points = np.vstack((R, Z)).T
points = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, grid=grid, **coords)
mask_in = points_inside_curve(points.as_array(), self._lcfs)
if points.grid:
mask_in = mask_in.reshape(len(points.x2), len(points.x1))
return mask_in
[docs]
def connection_length(self, *coordinates, R: np.array = None, Z: np.array = None,
coord_type=None, direction=1, **coords):
"""
Calculate connection length from given coordinates to first wall
Todo: The field line is traced to min/max value of z of first wall, distance is calculated to the last
point before first wall.
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param direction: if positive trace field line in/cons the direction of magnetic field.
:param stopper: (None, 'poloidal', 'z-stopper) force to use stopper. If None stopper is
automatically chosen based on psi_n coordinate.
:param coords:
:return:
"""
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, **coords)
traces = self.trace_field_line(coords, direction=direction)
dists = []
lines = []
for t in traces:
if t.psi_n[0] > 1:
# todo: find first False!
mask_in = self.in_first_wall(t)
rzp = t.as_array()[mask_in, :]
# todo: add intersection point!
line_in = self.coordinates(rzp)
dist = line_in.length
dists.append(dist)
lines.append(line_in)
else:
dists.append(np.infty)
lines.append(None)
return dists, lines
[docs]
def trace_field_line(self, *coordinates, R: np.array = None, Z: np.array = None,
coord_type=None, direction=1, stopper_method=None, in_first_wall=False, **coords):
"""
Return traced field lines starting from the given set of at least 2d coordinates.
One poloidal turn is calculated for field lines inside the separatrix. Outter field lines
are limited by z planes given be outermost z coordinates of the first wall.
:param coordinates:
:param R:
:param Z:
:param coord_type:
:param direction: if positive trace field line in/cons the direction of magnetic field.
:param stopper_method: (None, 'poloidal', 'z-stopper) force to use stopper. If None stopper is
automatically chosen based on psi_n coordinate.
:param in_first_wall: if True the only inner part of field line is returned.
:param coords:
:return:
"""
import pleque.utils.field_line_tracers as flt
from scipy.integrate import solve_ivp
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, **coords)
res = []
# XXXNOW
coords_rz = coords.as_array(dim=2)
sigma_B0 = np.sign(self.F0)
dphifunc = flt.dphi_tracer_factory(self.B_R, self.B_Z, self.B_tor)
r_lims = [np.min(self.first_wall.R), np.max(self.first_wall.R)]
z_lims = [np.min(self.first_wall.Z), np.max(self.first_wall.Z)]
for i in np.arange(len(coords)):
y0 = coords_rz[i]
if coords.dim == 2:
phi0 = 0
else:
phi0 = coords.phi[i]
atol = 1e-6
if self.is_xpoint_plasma:
xp = self._x_point
xp_dist = np.sqrt(np.sum((xp - y0) ** 2))
atol = np.minimum(xp_dist * 1e-3, atol)
if self._verbose:
print('>>> tracing from: {:3f},{:3f},{:3f}'.format(y0[0], y0[1], phi0))
print('>>> atol = {}'.format(atol))
stopper = None
if stopper_method is None:
if coords.psi_n[i] <= 1:
# todo: determine the direction (now -1) !!
if self._verbose:
print('>>> poloidal stopper is used')
# XXX Direction (TODO)
# XXX add these values to cocos dict!
# sign(dtheta/dphi) = sigma_pol * sign(I * B)
# dphidtheta = self._cocosdic['sigma_pol'] * np.sign(self.I_plasma) * np.sign(self.F0)
# print('dir: {}\nsigma_pol: {}\nsigma_tor: {}\nIp: {}\nF0: {}'.format(
# direction, self._cocosdic['sigma_pol'], self._cocosdic['sigma_cyl'], self.I_plasma, self.F0
# ))
# print('------------------')
dphidtheta = np.sign(self.F0) * self._cocosdic['sigma_pol'] * self._cocosdic['sigma_cyl']
print('direction: {}'.format(direction))
print('dphidtheta: {}'.format(dphidtheta))
stopper = flt.poloidal_angle_stopper_factory(y0, self.magnetic_axis.as_array()[0],
dphidtheta * direction,
stop_res=np.pi / 1024)
else:
if self._verbose:
print('>>> z-lim stopper is used')
stopper = flt.rz_coordinate_stopper_factory(r_lims, z_lims)
elif stopper_method == 'z-stopper':
if self._verbose:
print('>>> z-lim stopper is used')
stopper = flt.rz_coordinate_stopper_factory(r_lims, z_lims)
elif stopper_method == 'poloidal':
if self._verbose:
print('>>> poloidal stopper is used')
dphidtheta = np.sign(self.F0) * self._cocosdic['sigma_pol'] * self._cocosdic['sigma_cyl']
stopper = flt.poloidal_angle_stopper_factory(y0, self.magnetic_axis.as_array()[0],
dphidtheta * direction,
stop_res=np.pi / 1024)
# todo: define somehow sufficient tolerances
sol = solve_ivp(dphifunc,
(phi0, direction * sigma_B0 * (2 * np.pi * 50 + phi0)),
y0,
# method='RK45',
method='LSODA',
events=stopper,
max_step=1e-2, # we want high phi resolution
atol=atol,
rtol=1e-8,
)
if self._verbose:
print("{}, {}".format(sol.message, sol.nfev))
phi = sol.t
R, Z = sol.y
fl = self.coordinates(R, Z, phi)
# XXX add condirtion to stopper
if in_first_wall:
mask = self.in_first_wall(fl)
imask = mask.astype(int)
in_idxs = np.where(imask[:-1] - imask[1:] == 1)[0]
last_idx = False
if len(in_idxs) >= 1:
# Last point is still in (+1)
last_idx = in_idxs[0]
mask[last_idx + 1:] = False
Rs = fl.R[mask]
Zs = fl.Z[mask]
phis = fl.phi[mask]
intersec = self.first_wall.intersection(fl, dim=2)
if intersec is not None and len(in_idxs) >= 1:
R_last = Rs[-1]
Z_last = Zs[-1]
inter_idx = np.argmin((intersec.R - R_last) ** 2 + (intersec.Z - Z_last) ** 2)
Rx = intersec.R[inter_idx]
Zx = intersec.Z[inter_idx]
# last_idx = len(phis) - 1
coef = np.sqrt((Rx - fl.R[last_idx]) ** 2 + (Zx - fl.Z[last_idx]) ** 2 /
(fl.R[last_idx + 1] - fl.R[last_idx]) ** 2 +
(fl.Z[last_idx + 1] - fl.Z[last_idx]) ** 2)
phix = fl.phi[last_idx] + coef * (fl.phi[last_idx + 1] - fl.phi[last_idx])
Rs = np.append(Rs, Rx)
Zs = np.append(Zs, Zx)
phis = np.append(phis, phix)
fl = self.coordinates(Rs, Zs, phis)
res.append(fl)
return res
[docs]
def lcfs_field_line(self, vect_no=0, xp_shift=1e-6, phi0: float = 0.0):
"""
Computes (some) field line laying on last closed flux surface.
Parameters:
vect_no: int
Index of eigenvector determing the direction of integration. Default is 0.
xp_shift: float
A small positional adjustment for the x-point in the plasma boundary tracking.
Default is 1e-6.
phi0: float
Toroidal angle on which is field line initiated.
Returns:
list
A representation of the LCFS field line as a result of the plasma boundary
tracking method.
"""
lcfs = track_plasma_boundary(self, self._x_point, vect_no=0, xp_shift=1e-6, phi_0=phi0)
return lcfs
[docs]
def trace_flux_surface(self, *coordinates, s_resolution=1e-3, R=None,
Z=None, psi_n=None, coord_type=None, **coords):
"""
Find a closed flux surface inside LCFS with requested values of psi or psi-normalized.
TODO support open and/or flux surfaces outise LCFS, needs different stopper
:param R:
:param Z:
:param psi_n:
:param coord_type:
:param coordinates: specifies flux surface to search for (by spatial point or values of psi or psi normalised).
If coordinates is spatial point (dim=2) then the trace starts at the midplane.
Coordinates.grid must be False.
:param s_resolution: max_step in the distance along the flux surface contour
:return: FluxSurface
"""
import pleque.utils.field_line_tracers as flt
from scipy.integrate import solve_ivp
coords = self.coordinates(*coordinates, R=R, Z=Z, coord_type=coord_type, **coords)
if coords.dim == 1:
coords = self.coordinates(R=self.magnetic_axis.R + coords.r_mid, Z=0)
y0 = np.reshape([coords.R, coords.Z], (2,))
ds_func = flt.ds_grad_psi_tracer_factory(self._spl_psi)
# atol is square of s_resolution to offset the square distance
stopper = flt.rz_target_s_min_stopper_factory(y0, coords.r_mid,
atol=s_resolution ** 2)
sol = solve_ivp(ds_func,
(0, coords.r_mid * 2 * np.pi * 4),
y0,
method='LSODA',
events=stopper,
max_step=s_resolution,
vectorized=True,
# rtol=1e-8,
)
fs = self._as_fluxsurface(R=np.hstack([sol.y[0], sol.y[0, 0]]),
Z=np.hstack([sol.y[1], sol.y[1, 0]]))
fs._cum_length = np.hstack([sol.t, sol.t[-1] + s_resolution]) # TODO use some setter
return fs
@property
def fluxfuncs(self):
if not hasattr(self, '_fluxfunc'):
self._fluxfunc = FluxFunctions(self) # filters out methods from self
return self._fluxfunc
@property
def surfacefuncs(self):
if not hasattr(self, '_surfacefunc'):
self._surfacefunc = SurfaceFunctions(self) # filters out methods from self
return self._surfacefunc
[docs]
def to_geqdsk(self, file, nx=64, ny=128, q_positive=True, use_basedata=False, cocos_out=3):
"""
Write a GEQDSK/g-file equilibrium file.
:param file: str, file name
:param nx: int, number radial points and profiles points
:param ny: int, number of vertical points
:param use_basedata: The original basedata of equilibrium are used instead of interpolation splines.
If this option is chosen, the nx and ny parameters are ignored.
:param q_positive: Save q value always positive.
:param cocos_out: Number of output cocos. `None` if not changed.
Warning: `cocos_out` parameter only perform 2 pi normalization at the moment.
Default value is 3, which corresponds to standard eqdsk EFIT output.
"""
import pleque.io.geqdsk as geqdsk
geqdsk.write(self, file, nx=nx, ny=ny, q_positive=q_positive, use_basedata=use_basedata,
cocos_out=cocos_out)
@property
def cocos(self):
"""
Number of internal COCOS representation.
:return: int
"""
return self._cocos
@property
def is_xpoint_plasma(self):
"""
Return true for x-point plasma.
:return: bool
"""
return not self._limiter_plasma
@property
def is_limter_plasma(self):
"""
Return true if the plasma is limited by point or some limiter point.
:return: bool
"""
return self._limiter_plasma
def _map_midplane2psi(self):
from scipy.interpolate import UnivariateSpline
r_mid = np.linspace(0, self.R_max - self._mg_axis[0], 100)
psi_mid = self.psi(r_mid + self._mg_axis[0], self._mg_axis[1] * np.ones_like(r_mid), grid=False)
if self._psi_axis < self._psi_lcfs:
# psi increasing:
idxs = arglis(psi_mid)
else:
# psi decreasing
idxs = arglis(psi_mid[::-1])
idxs = idxs[::-1]
psi_mid = psi_mid[idxs]
r_mid = r_mid[idxs]
self._rmid_spl = UnivariateSpline(psi_mid, r_mid, k=3, s=0)
def _init_fluxsurfaces(self, npsi: Optional[int] = None, psi_n_levels: Optional[Sequence[float]] = None):
if psi_n_levels and npsi:
raise ValueError("npsi and psi_n_levels cannot be used simultaneously.")
if psi_n_levels is None:
psi0 = settings.psin0
if npsi is None:
npsi = settings.npsi_grid
psi_n_levels = np.linspace(psi0, 1, npsi)
else:
npsi = len(psi_n_levels)
# todo: Now I need to rewrite find_flux_surface function first to used multiple psi levels.
surfs = []
for psi_n in psi_n_levels:
surf = self.find_flux_surface(psi_n=psi_n)[0]
surfs.append(surf)
self._flux_surfaces = surfs
def _init_q(self):
psi_n = np.arange(0.01, 1, 0.005)
qs = []
if self._verbose:
print("--- Generating q-splines ---")
for i, pn in enumerate(psi_n):
if self._verbose and np.mod(i, 20) == 0:
print("{:.0f}%\r".format(pn / np.max(psi_n) * 100))
surface = self._flux_surface(psi_n=pn)
c = surface[0]
qs.append(c.eval_q)
qs = np.array(qs)
self._q_spl = UnivariateSpline(psi_n, qs, s=0, k=3)
self._dq_dpsin_spl = self._q_spl.derivative()
self._q_anideriv_spl = self._q_spl.antiderivative()