1. 데이터 생성

(1) reference

Stochastic parametrizations and model uncertainty in the Lorenz ’96 system

(2) 파라미터 셋팅

파라미터 항목 Arnold Kim
Initial Condition 개수 300 300
dt 0.001 0.005
Transient 언급 없음 3
개별 IC당 적분 길이 언급 없음 10
ode_solver Adaptive RK4 ERK4
X variables 8 36
Y variables per X 32 10
h 1 1
b 10 10
c 4 or 10 10
F 20 실험 결과, 20으로 지정했을 때 correlation이 0 근처로 가는 경우가 많음

image.png

(3) 실험 코드

GitHub - m2lines/L96_demo: Lorenz 1996 two time-scale model for learning machine learning

""" Lorenz-96 model
Lorenz E., 1996. Predictability: a problem partly solved. In
Predictability. Proc 1995. ECMWF Seminar, 1-18.
<https://www.ecmwf.int/en/elibrary/10829-predictability-problem-partly-solved>
"""

import os
import numpy as np

from numba import jit, njit
from tqdm import tqdm

@njit
def L96_eq1_xdot(X, F, advect=True):
    """
    Calculate the time rate of change for the X variables for the Lorenz '96, equation 1:
        d/dt X[k] = -X[k-2] X[k-1] + X[k-1] X[k+1] - X[k] + F

    Args:
        X : Values of X variables at the current time step
        F : Forcing term
    Returns:
        dXdt : Array of X time tendencies
    """

    K = len(X)
    Xdot = np.zeros(K)

    if advect:
        Xdot = np.roll(X, 1) * (np.roll(X, -1) - np.roll(X, 2)) - X + F
    else:
        Xdot = -X + F
    #     for k in range(K):
    #         Xdot[k] = ( X[(k+1)%K] - X[k-2] ) * X[k-1] - X[k] + F
    return Xdot

@njit
def L96_2t_xdot_ydot(X, Y, F, h, b, c):
    """
    Calculate the time rate of change for the X and Y variables for the Lorenz '96, two time-scale
    model, equations 2 and 3:
        d/dt X[k] =     -X[k-1] ( X[k-2] - X[k+1] )   - X[k] + F - h.c/b sum_j Y[j,k]
        d/dt Y[j] = -b c Y[j+1] ( Y[j+2] - Y[j-1] ) - c Y[j]     + h.c/b X[k]

    Args:
        X : Values of X variables at the current time step
        Y : Values of Y variables at the current time step
        F : Forcing term
        h : coupling coefficient
        b : ratio of amplitudes
        c : time-scale ratio
    Returns:
        dXdt, dYdt, C : Arrays of X and Y time tendencies, and the coupling term -hc/b*sum(Y,j)
    """

    JK, K = len(Y), len(X)
    J = JK // K
    assert JK == J * K, "X and Y have incompatible shapes"
    Xdot = np.zeros(K)
    hcb = (h * c) / J

    Ysummed = Y.reshape((K, J)).sum(axis=-1)

    Xdot = np.roll(X, 1) * (np.roll(X, -1) - np.roll(X, 2)) - X + F - h * Ysummed / J
    Ydot = (
        -c * J * np.roll(Y, -1) * (np.roll(Y, -2) - np.roll(Y, 1))
        - c * Y
        + hcb * np.repeat(X, J)
    )

    return Xdot, Ydot, - h * Ysummed / J

# Time-stepping methods ##########################################################################################

def EulerFwd(fn, dt, X, *params):
    """
    Calculate the new state X(n+1) for d/dt X = fn(X,t,F) using the Euler forward method.

    Args:
        fn : The function returning the time rate of change of model variables X
        dt : The time step
        X  : Values of X variables at the current time, t
        params : All other arguments that should be passed to fn, i.e. fn(X, t, *params)

    Returns:
        X at t+dt
    """

    return X + dt * fn(X, *params)

def RK2(fn, dt, X, *params):
    """
    Calculate the new state X(n+1) for d/dt X = fn(X,t,F) using the second order Runge-Kutta method.

    Args:
        fn : The function returning the time rate of change of model variables X
        dt : The time step
        X  : Values of X variables at the current time, t
        params : All other arguments that should be passed to fn, i.e. fn(X, t, *params)

    Returns:
        X at t+dt
    """

    X1 = X + 0.5 * dt * fn(X, *params)
    return X + dt * fn(X1, *params)

def RK4(fn, dt, X, *params):
    """
    Calculate the new state X(n+1) for d/dt X = fn(X,t,...) using the fourth order Runge-Kutta method.

    Args:
        fn     : The function returning the time rate of change of model variables X
        dt     : The time step
        X      : Values of X variables at the current time, t
        params : All other arguments that should be passed to fn, i.e. fn(X, t, *params)

    Returns:
        X at t+dt
    """

    Xdot1 = fn(X, *params)
    Xdot2 = fn(X + 0.5 * dt * Xdot1, *params)
    Xdot3 = fn(X + 0.5 * dt * Xdot2, *params)
    Xdot4 = fn(X + dt * Xdot3, *params)
    return X + (dt / 6.0) * ((Xdot1 + Xdot4) + 2.0 * (Xdot2 + Xdot3))

# Model integrators #############################################################################################

# @jit(forceobj=True)
def integrate_L96_1t(X0, F, dt, nt, method=RK4, t0=0):
    """
    Integrates forward-in-time the single time-scale Lorenz 1996 model, using the integration "method".
    Returns the full history with nt+1 values starting with initial conditions, X[:,0]=X0, and ending
    with the final state, X[:,nt+1] at time t0+nt*dt.

    Args:
        X0     : Values of X variables at the current time
        F      : Forcing term
        dt     : The time step
        nt     : Number of forwards steps
        method : The time-stepping method that returns X(n+1) given X(n)
        t0     : Initial time (defaults to 0)

    Returns:
        X[:,:], time[:] : the full history X[n,k] at times t[n]

    Example usage:
        X,t = integrate_L96_1t(5+5*np.random.rand(8), 18, 0.01, 500)
        plt.plot(t, X);
    """

    time, hist = t0 + np.zeros((nt + 1)), np.zeros((nt + 1, len(X0)))
    X = X0.copy()
    hist[0, :] = X
    for n in range(nt):
        X = method(L96_eq1_xdot, dt, X, F)
        hist[n + 1], time[n + 1] = X, t0 + dt * (n + 1)
    return hist, time

# @jit(forceobj=True)
def integrate_L96_2t(X0, Y0, si, nt, F, h, b, c, t0=0, dt=0.001):
    """
    Integrates forward-in-time the two time-scale Lorenz 1996 model, using the RK4 integration method.
    Returns the full history with nt+1 values starting with initial conditions, X[:,0]=X0 and Y[:,0]=Y0,
    and ending with the final state, X[:,nt+1] and Y[:,nt+1] at time t0+nt*si.

    Note the model is intergrated

    Args:
        X0 : Values of X variables at the current time
        Y0 : Values of Y variables at the current time
        si : Sampling time interval
        nt : Number of sample segments (results in nt+1 samples incl. initial state)
        F  : Forcing term
        h  : coupling coefficient
        b  : ratio of amplitudes
        c  : time-scale ratio
        t0 : Initial time (defaults to 0)
        dt : The actual time step. If dt<si, then si is used. Otherwise si/dt must be a whole number. Default 0.001.

    Returns:
        X[:,:], Y[:,:], time[:] : the full history X[n,k] and Y[n,k] at times t[n]

    Example usage:
        X,Y,t = integrate_L96_2t(5+5*np.random.rand(8), np.random.rand(8*4), 0.01, 500, 18, 1, 10, 10)
        plt.plot( t, X);
    """

    xhist, yhist, time, _ = integrate_L96_2t_with_coupling(
        X0, Y0, si, nt, F, h, b, c, t0=t0, dt=dt
    )

    return xhist, yhist, time

# @jit(forceobj=True)
def integrate_L96_2t_with_coupling(X0, Y0, si, nt, F, h, b, c, t0=0, dt=0.001):
    """
    Integrates forward-in-time the two time-scale Lorenz 1996 model, using the RK4 integration method.
    Returns the full history with nt+1 values starting with initial conditions, X[:,0]=X0 and Y[:,0]=Y0,
    and ending with the final state, X[:,nt+1] and Y[:,nt+1] at time t0+nt*si.

    Note the model is intergrated

    Args:
        X0 : Values of X variables at the current time
        Y0 : Values of Y variables at the current time
        si : Sampling time interval
        nt : Number of sample segments (results in nt+1 samples incl. initial state)
        F  : Forcing term
        h  : coupling coefficient
        b  : ratio of amplitudes
        c  : time-scale ratio
        t0 : Initial time (defaults to 0)
        dt : The actual time step. If dt<si, then si is used. Otherwise si/dt must be a whole number. Default 0.001.

    Returns:
        X[:,:], Y[:,:], time[:], hcbY[:,:] : the full history X[n,k] and Y[n,k] at times t[n], and coupling term

    Example usage:
        X,Y,t,_ = integrate_L96_2t_with_coupling(5+5*np.random.rand(8), np.random.rand(8*4), 0.01, 500, 18, 1, 10, 10)
        plt.plot( t, X);
    """

    time, xhist, yhist, xytend_hist = (
        t0 + np.zeros((nt + 1)),
        np.zeros((nt + 1, len(X0))),
        np.zeros((nt + 1, len(Y0))),
        np.zeros((nt + 1, len(X0))),
    )
    X, Y = X0.copy(), Y0.copy()
    xhist[0, :] = X
    yhist[0, :] = Y
    xytend_hist[0, :] = 0
    if si < dt:
        dt, ns = si, 1
    else:
        ns = int(si / dt + 0.5)
        assert (
            abs(ns * dt - si) < 1e-14
        ), "si is not an integer multiple of dt: si=%f dt=%f ns=%i" % (si, dt, ns)

    for n in range(nt):
        for s in range(ns):
            # RK4 update of X,Y
            Xdot1, Ydot1, XYtend = L96_2t_xdot_ydot(X, Y, F, h, b, c)
            Xdot2, Ydot2, _ = L96_2t_xdot_ydot(
                X + 0.5 * dt * Xdot1, Y + 0.5 * dt * Ydot1, F, h, b, c
            )
            Xdot3, Ydot3, _ = L96_2t_xdot_ydot(
                X + 0.5 * dt * Xdot2, Y + 0.5 * dt * Ydot2, F, h, b, c
            )
            Xdot4, Ydot4, _ = L96_2t_xdot_ydot(
                X + dt * Xdot3, Y + dt * Ydot3, F, h, b, c
            )
            X = X + (dt / 6.0) * ((Xdot1 + Xdot4) + 2.0 * (Xdot2 + Xdot3))
            Y = Y + (dt / 6.0) * ((Ydot1 + Ydot4) + 2.0 * (Ydot2 + Ydot3))

        xhist[n + 1], yhist[n + 1], time[n + 1], xytend_hist[n + 1] = (
            X,
            Y,
            t0 + si * (n + 1),
            XYtend,
        )
    return xhist, yhist, time, xytend_hist

def s(k, K):
    """A non-dimension coordinate from -1..+1 corresponding to k=0..K"""
    return 2 * k / K - 1

def validate_simulation_data(X_forecast, Y_forecast, t_forecast, C_forecast, ic_num):
    """
    시뮬레이션 데이터의 품질을 검증하는 함수
    
    Args:
        X_forecast: X 변수 예측 데이터
        Y_forecast: Y 변수 예측 데이터  
        t_forecast: 시간 데이터
        C_forecast: Coupling 데이터
        ic_num: 초기 조건 번호
        
    Returns:
        bool: 검증 통과 여부
    """
    # 1. NaN/Inf 검증
    if (np.any(np.isnan(X_forecast)) or np.any(np.isnan(Y_forecast)) or 
        np.any(np.isnan(t_forecast)) or np.any(np.isnan(C_forecast))):
        print(f"IC {ic_num}: NaN 값이 발견되어 검증 실패")
        return False
        
    if (np.any(np.isinf(X_forecast)) or np.any(np.isinf(Y_forecast)) or 
        np.any(np.isinf(t_forecast)) or np.any(np.isinf(C_forecast))):
        print(f"IC {ic_num}: Inf 값이 발견되어 검증 실패")
        return False
    
    # 2. 극단적인 값 검증 (발산 감지)
    x_max_abs = np.max(np.abs(X_forecast))
    y_max_abs = np.max(np.abs(Y_forecast))
    c_max_abs = np.max(np.abs(C_forecast))
    
    if x_max_abs > 1000:
        print(f"IC {ic_num}: X 변수가 발산 (최대 절댓값: {x_max_abs:.2f})")
        return False
        
    if y_max_abs > 1000:
        print(f"IC {ic_num}: Y 변수가 발산 (최대 절댓값: {y_max_abs:.2f})")
        return False
        
    if c_max_abs > 1000:
        print(f"IC {ic_num}: Coupling 변수가 발산 (최대 절댓값: {c_max_abs:.2f})")
        return False
    
    # 3. 데이터 범위 검증 (물리적으로 의미있는 범위)
    x_range = np.max(X_forecast) - np.min(X_forecast)
    y_range = np.max(Y_forecast) - np.min(Y_forecast)
    
    if x_range < 0.1:  # X 변수가 거의 변화하지 않음
        print(f"IC {ic_num}: X 변수의 변화가 너무 작음 (범위: {x_range:.6f})")
        return False
        
    if y_range < 0.01:  # Y 변수가 거의 변화하지 않음
        print(f"IC {ic_num}: Y 변수의 변화가 너무 작음 (범위: {y_range:.6f})")
        return False
    
    # 4. 시간 데이터 검증
    if len(t_forecast) < 100:  # 최소 시간 스텝 수 확인
        print(f"IC {ic_num}: 시간 스텝이 너무 적음 ({len(t_forecast)})")
        return False
    
    # 5. 데이터 일관성 검증
    if (X_forecast.shape[0] != Y_forecast.shape[0] or 
        X_forecast.shape[0] != t_forecast.shape[0] or 
        X_forecast.shape[0] != C_forecast.shape[0]):
        print(f"IC {ic_num}: 데이터 차원이 일치하지 않음")
        return False
    
    # 6. 통계적 검증 (이상치 감지)
    x_std = np.std(X_forecast)
    y_std = np.std(Y_forecast)
    
    if x_std < 0.01:  # X 변수의 표준편차가 너무 작음
        print(f"IC {ic_num}: X 변수의 표준편차가 너무 작음 ({x_std:.6f})")
        return False
        
    if y_std < 0.001:  # Y 변수의 표준편차가 너무 작음
        print(f"IC {ic_num}: Y 변수의 표준편차가 너무 작음 ({y_std:.6f})")
        return False
    
    print(f"IC {ic_num}: 검증 통과 ✓ (X_max: {x_max_abs:.2f}, Y_max: {y_max_abs:.2f}, C_max: {c_max_abs:.2f})")
    return True

# Class for convenience
class L96s:
    """
    Class for single time-scale Lorenz 1996 model
    """

    X = None  # Current X state or initial conditions
    F = None  # Forcing
    dt = None  # Default time-step
    method = None  # Integration method

    def __init__(self, K, dt, F=18, method=EulerFwd, t=0):
        """Construct a single time-scale model with parameters:
        K      : Number of X values
        dt     : time-step
        F      : Forcing term
        t      : Initial time
        method : Integration method, e.g. EulerFwd, RK2, or RK4
        """
        self.F, self.dt = F, dt
        self.method = method
        self.X, self.t = F * np.random.randn(K), t
        self.K = self.X.size  # For convenience
        self.k = np.arange(self.K)  # For plotting

    def __repr__(self):
        return (
            "L96: "
            + "K="
            + str(self.K)
            + " F="
            + str(self.F)
            + " dt="
            + str(self.dt)
            + " method="
            + str(self.method)
        )

    def __str__(self):
        return self.__repr__() + "\\n X=" + str(self.X) + "\\n t=" + str(self.t)

    def copy(self):
        copy = L96s(self.K, self.dt, F=self.F, method=self.method)
        copy.set_state(self.X, t=self.t)
        return copy

    def print(self):
        print(self)

    def set_param(self, dt=None, F=None, t=None, method=None):
        """Set a model parameter, e.g. .set_param(dt=0.002)"""
        if dt is not None:
            self.dt = dt
        if F is not None:
            self.F = F
        if t is not None:
            self.t = t
        if method is not None:
            self.method = method
        return self

    def set_state(self, X, t=None):
        """Set initial conditions (or current state), e.g. .set_state(X)"""
        self.X = X
        self.K = self.X.size  # For convenience
        self.k = np.arange(self.K)  # For plotting
        if t is not None:
            self.t = t
        return self

    def randomize_IC(self):
        """Randomize the initial conditions (or current state)"""
        self.X = self.F * np.random.rand(self.X.size)
        return self

    def run(self, T, store=False):
        """Run model for a total time of T.
        If store=Ture, then stores the final state as the initial conditions for the next segment.
        Returns full history: X[:,:],t[:]."""
        nt = int(T / self.dt)
        X, t = integrate_L96_1t(
            self.X, self.F, self.dt, nt, method=self.method, t0=self.t
        )
        if store:
            self.X, self.t = X[-1], t[-1]
        return X, t

class L96:
    """
    Class for two time-scale Lorenz 1996 model
    """

    X = "Current X state or initial conditions"
    Y = "Current Y state or initial conditions"
    F = "Forcing"
    h = "Coupling coefficient"
    b = "Ratio of timescales"
    c = "Ratio of amplitudes"
    dt = "Time step"

    def __init__(self, K, J, F=18, h=1, b=10, c=10, t=0, dt=0.001):
        """Construct a two time-scale model with parameters:
        K  : Number of X values
        J  : Number of Y values per X value
        F  : Forcing term (default 18.)
        h  : coupling coefficient (default 1.)
        b  : ratio of amplitudes (default 10.)
        c  : time-scale ratio (default 10.)
        t  : Initial time (default 0.)
        dt : Time step (default 0.001)
        """
        self.F, self.h, self.b, self.c, self.dt = F, h, b, c, dt
        self.X, self.Y, self.t = b * np.random.randn(K), np.random.randn(J * K), t
        self.K, self.J, self.JK = K, J, J * K  # For convenience
        self.k, self.j = np.arange(self.K), np.arange(self.JK)  # For plotting

    def __repr__(self):
        return (
            "L96: "
            + "K="
            + str(self.K)
            + " J="
            + str(self.J)
            + " F="
            + str(self.F)
            + " h="
            + str(self.h)
            + " b="
            + str(self.b)
            + " c="
            + str(self.c)
            + " dt="
            + str(self.dt)
        )

    def __str__(self):
        return (
            self.__repr__()
            + "\\n X="
            + str(self.X)
            + "\\n Y="
            + str(self.Y)
            + "\\n t="
            + str(self.t)
        )

    def copy(self):
        copy = L96(self.K, self.J, F=self.F, h=self.h, b=self.b, c=self.c, dt=self.dt)
        copy.set_state(self.X, self.Y, t=self.t)
        return copy

    def print(self):
        print(self)

    def set_param(self, dt=None, F=None, h=None, b=None, c=None, t=None):
        """Set a model parameter, e.g. .set_param(si=0.01, dt=0.002)"""
        if dt is not None:
            self.dt = dt
        if F is not None:
            self.F = F
        if h is not None:
            self.h = h
        if b is not None:
            self.b = b
        if c is not None:
            self.c = c
        if t is not None:
            self.t = t
        return self

    def set_state(self, X, Y, t=None):
        """Set initial conditions (or current state), e.g. .set_state(X,Y)"""
        self.X, self.Y = X, Y
        if t is not None:
            self.t = t
        self.K, self.JK = self.X.size, self.Y.size  # For convenience
        self.J = self.JK // self.K
        self.k, self.j = np.arange(self.K), np.arange(self.JK)  # For plotting
        return self

    def randomize_IC(self):
        """Randomize the initial conditions (or current state)"""
        X, Y = self.b * np.random.rand(self.X.size), np.random.rand(self.Y.size)
        return self.set_state(X, Y)

    def run(self, si, T, store=False, return_coupling=False):
        """Run model for a total time of T, sampling at intervals of si.
        If store=Ture, then stores the final state as the initial conditions for the next segment.
        If return_coupling=True, returns C in addition to X,Y,t.
        Returns sampled history: X[:,:],Y[:,:],t[:],C[:,:]."""
        nt = int(T / si)
        X, Y, t, C = integrate_L96_2t_with_coupling(
            self.X,
            self.Y,
            si,
            nt,
            self.F,
            self.h,
            self.b,
            self.c,
            t0=self.t,
            dt=self.dt,
        )
        if store:
            print(f"\\nt : {self.t, t[-1]}")
            self.X, self.Y, self.t = X[-1], Y[-1], t[-1]
        if return_coupling:
            return X, Y, t, C
        else:
            return X, Y, t

    
if __name__ == "__main__":
    K = 8 # Number of globa-scale variables X
    J = 32 # Number of local-scale Y variables per single global-scale X variable
    F = 15.0 # Focring
    b = 10.0 # ratio of amplitudes
    c = 10.0 # time-scale ratio
    h = 1.0 # Coupling coefficient
    noise = 0.03

    si = 0.005  # Sampling time interval
    dt = 0.005  # Time step
    
    print("\\n=== Running Main Experiment ===")

    num_ic = 300  # 초기 조건 개수
    ic_interval = 10  # 적분의 마지막 포인트 이후 다음 적분을 시작할 초기 조건까지의 간격 (MTU)
    forecast_time = 10  # # Hist_Deterministic.py의 nt = 각 초기 조건 별 적분 기간 (MTU) (10 / 0.005 = 20000)
    
    # 데이터 저장을 위한 배열 초기화
    # 메모리 효율성을 위해 결과를 점진적으로 저장
    time_steps_per_ic = int(forecast_time / si) + 1
    all_X = np.zeros((num_ic, time_steps_per_ic, K))
    all_Y = np.zeros((num_ic, time_steps_per_ic, J * K))
    all_t = np.zeros((num_ic, time_steps_per_ic))
    all_C = np.zeros((num_ic, time_steps_per_ic, K))
    
    
    import time
    start_time = time.time()
    k = np.arange(K)
    j = np.arange(J * K)

    Xinit = s(k, K) * (s(k, K) - 1) * (s(k, K) + 1)
    Yinit = 0 * s(j, J * K) * (s(j, J * K) - 1) * (s(j, J * K) + 1)

    # 모델 초기화
    coupled_system = L96(K, J, F=F, h=h, b=b, c=c, dt=dt)
    coupled_system.set_state(Xinit, Yinit)
    spinup_time = 3 # Hist_Deterministic.py의 nt_pre = (10 / 0.005 = 2000)
    count_ic = 0
    while count_ic < num_ic:
        # 3 MTU 동안 스핀업 실행 및 spinup 상태 저장
        X_spinup, Y_spinup, t_spinup, C_spinup = coupled_system.run(si, spinup_time, store=True, return_coupling=True)
        # 이후 10 MTU 동안 적분 후 마지막 상태 저장
        X_forecast, Y_forecast, t_forecast, C_forecast = coupled_system.run(si, forecast_time, store=True, return_coupling=True)
        # 데이터 검증 수행
        if not validate_simulation_data(X_forecast, Y_forecast, t_forecast, C_forecast, count_ic + 1):
            print(f"IC {count_ic + 1}: Coupling System 검증 실패로 재시도합니다.")
            coupled_system.randomize_IC()
            continue
            
        # 검증 통과 시 데이터 저장
        print(f"IC {count_ic + 1}: 검증 통과, Coupling System 데이터 저장 중...")
        
        # 적분 결과 저장
        all_X[count_ic] = X_forecast
        all_Y[count_ic] = Y_forecast
        all_t[count_ic] = t_forecast
        all_C[count_ic] = C_forecast
        count_ic += 1
        coupled_system.randomize_IC()

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"\\n모든 Coupling System 초기 조건 처리 완료! 소요 시간: {elapsed_time:.2f}초")

    # 데이터를 50개 초기 조건씩 분할하여 저장
    results_dir = os.path.join(os.getcwd(), "simulated_data")
    os.makedirs(results_dir, exist_ok=True)

    batch_size = 1
    num_batches = (num_ic + batch_size - 1) // batch_size  # 올림 나눗셈

    print(f"\\n총 {num_ic}개의 검증된 데이터를 {num_batches}개 배치로 저장합니다...")

    for batch in tqdm(range(num_batches), desc="Coupling System 데이터 저장 중"):
        start_idx = batch * batch_size
        end_idx = min((batch + 1) * batch_size, num_ic)
        
        # 저장 전 최종 검증
        batch_X = all_X[start_idx:end_idx]
        batch_Y = all_Y[start_idx:end_idx]
        batch_t = all_t[start_idx:end_idx]
        batch_C = all_C[start_idx:end_idx]

    print(f"\\n총 {num_ic}개의 검증된 single system 데이터 생성 시작...")

    start_time = time.time()
    single_system = L96s(K, dt, F=F, method=EulerFwd)
    single_system.set_state(Xinit)
    all_X_single = np.zeros((num_ic, time_steps_per_ic, K))
    all_t_single = np.zeros((num_ic, time_steps_per_ic))

    count_ic = 0
    while count_ic < num_ic:
        # 3 MTU 동안 스핀업 실행 및 spinup 상태 저장
        X_spinup_single, t_spinup_single = single_system.run(si, spinup_time, store=True)
        # 이후 10 MTU 동안 적분 후 마지막 상태 저장
        X_forecast_single, t_forecast_single = single_system.run(si, forecast_time, store=True)
        # 데이터 검증 수행
        if not validate_simulation_data(X_forecast_single, t_forecast_single, count_ic + 1):
            print(f"IC {count_ic + 1}: 검증 실패로 재시도합니다.")
            single_system.randomize_IC()
            continue
            
        # 검증 통과 시 데이터 저장
        print(f"IC {count_ic + 1}: 검증 통과, single system 데이터 저장 중...")
        
        all_X_single[count_ic] = X_forecast_single
        all_t_single[count_ic] = t_forecast_single
        single_system.randomize_IC()

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"\\n모든 single system 초기 조건 처리 완료! 소요 시간: {elapsed_time:.2f}초")

    for batch in tqdm(range(num_batches), desc="Single System 데이터 저장 중"):
        start_idx = batch * batch_size
        end_idx = min((batch + 1) * batch_size, num_ic)
        
        # 저장 전 최종 검증
        batch_X_single = all_X_single[start_idx:end_idx]
        batch_t_single = all_t_single[start_idx:end_idx]
        
        # 각 배치의 데이터 저장
        np.save(f"{results_dir}/X_batch_single_{batch+1}.npy", batch_X_single)
        np.save(f"{results_dir}/t_batch_single_{batch+1}.npy", batch_t_single)
        
        
    # 메타데이터 저장
    metadata = {
        'K': K,
        'J': J,
        'F': F,
        'h': h,
        'b': b,
        'c': c,
        'dt': dt,
        'si': si,
        'spinup_time': spinup_time,
        'forecast_time': forecast_time,
        'ic_interval': ic_interval,
        'num_ic': num_ic,
        'time_steps_per_ic': time_steps_per_ic,
        'num_batches': num_batches,
        'batch_size': batch_size
    }
    import json
    with open(f"{results_dir}/metadata.json", "w") as f:
        json.dump(metadata, f)

    print(f"\\n=== 데이터 생성 및 저장 완료 ===")
    print(f"저장 위치: {results_dir}")
    print(f"총 {num_ic}개의 검증된 초기 조건")
    print(f"총 {num_batches}개의 배치 파일")
    print(f"모든 데이터가 품질 검증을 통과하여 저장되었습니다. ✓")

(4) Correlation Matrix

1번 테스트

Test1_X1_correlation.png

Test1_X1_correlation_distribution.png