In [None]:
# Import notwendiger Bibliotheken
import nussl
from common import data, viz
from IPython.display import Audio
import IPython
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import librosa
import os
from scipy import signal
import matplotlib.pyplot as plt


In [4]:
# STFT Parameter setzen
stft_params = nussl.STFTParams(window_length=512, hop_length=128, window_type='sqrt_hann')
#stft_params = nussl.STFTParams(window_length=1024, hop_length=256, window_type='sqrt_hann')

# Pfad zu den Trainingsdaten festlegen
fg_path = "C:\\Users\\Lukas\\nussl_tutorial\\train"

# Trainingsdaten mit `nussl` laden
train_data = data.on_the_fly(stft_params, transform=None, fg_path=fg_path, num_mixtures=100, coherent_prob=1)

In [None]:
# Funktionen zum Visualizieren der Waveform und Spectrogram

def show_wav(sources):
    if isinstance(sources, list):
        sources = {f'Source {i}': s for i, s in enumerate(sources)}
    plt.figure(figsize=(10, 5))
    plt.plot()
    nussl.core.utils.visualize_sources_as_waveform(sources)
    plt.show()

def show_1wav(data):
    plt.figure(figsize=(10, 5))
    plt.plot()
    nussl.core.utils.visualize_waveform(data)
    plt.show()

def show_fre(sources):
    if isinstance(sources, list):
        sources = {f'Source {i}': s for i, s in enumerate(sources)}
    plt.figure(figsize=(10, 5))
    plt.plot()
    nussl.core.utils.visualize_sources_as_masks(sources, db_cutoff=-80)
    plt.tight_layout()
    plt.show()

def show_1fre(data):
    plt.figure(figsize=(10, 5))
    plt.plot()
    nussl.core.utils.visualize_spectrogram(data)
    plt.show()


In [None]:
# Beispielhafte Audioausgabe zur Überprüfung
item = train_data[0]
print(item.keys())
display(Audio(data=item['mix'].audio_data, rate=item['mix'].sample_rate))
show_wav(item['sources'])

In [None]:

# Vorverarbeitung der STFT-Spezrogramme
def preprocess_spectrogram(magnitude, target_size=(512, 128)):
    magnitude = tf.image.resize(magnitude, target_size)
    return magnitude

# Extrahieren der STFT-Daten aus den Mix- und Gesangsdaten
def prepare_data(data_item, stft_params):
    # Lade die Mix- und Gesangs-Daten
    mix = data_item['mix']
    vocals = data_item['sources']['vocals']

    # Berechne das STFT (Spektrum) der Mix- und Gesangs-Daten
    mix_signal = mix
    vocals_signal = vocals

    # STFT von Mix und Gesang
    mix_mag = np.abs(mix_signal.stft(window_length=stft_params.window_length, hop_length=stft_params.hop_length))
    vocals_mag = np.abs(vocals_signal.stft(window_length=stft_params.window_length, hop_length=stft_params.hop_length))

    # Resize für das U-Net
    mix_mag_resized = preprocess_spectrogram(mix_mag)
    vocals_mag_resized = preprocess_spectrogram(vocals_mag)

    return mix_mag_resized, vocals_mag_resized

# Beispiel-Daten (train_data ist hier das, was du aus nussl bekommst)
mix_data, vocals_data = prepare_data(train_data[0], stft_params)


In [None]:
# SDR Metric: Verhältnis zwische gewolltem Signal und Rauschen in dB
def sdr_metric(y_true, y_pred):
    signal_power = tf.reduce_sum(y_true ** 2)
    noise_power = tf.reduce_sum((y_true - y_pred) ** 2)
    return 10 * tf.math.log(signal_power / noise_power) / tf.math.log(10.0)

In [10]:
# Vorhersagen und SDR für Testdaten berechnen
def evaluate_model_with_sdr(model, test_data, stft_params):
    sdr_scores = []
    for data_item in test_data:
        # Preprocess test sample
        X_test, y_test = prepare_data(data_item, stft_params)

        # Vorhersage des Modells
        y_pred = model.predict(np.expand_dims(X_test, axis=0))[0]

        # SDR berechnen
        #sdr_score = compute_sdr(y_test.numpy(), y_pred)
        #sdr_score = compute_sdr(y_test, y_pred)
        sdr_score = sdr_metric(y_test, y_pred)
        sdr_scores.append(sdr_score)
    
    # Durchschnittlichen SDR berechnen
    avg_sdr = np.mean(sdr_scores)
    print(f"Durchschnittlicher SDR: {avg_sdr:.2f} dB")
    return avg_sdr

In [None]:

def build_unet(input_shape=(512, 128, 1)):
    inputs = tf.keras.Input(shape=input_shape)

    # Encoder
    conv1 = layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same')(inputs)
    conv1 = layers.BatchNormalization()(conv1)
    conv1 = layers.ReLU()(conv1)
    conv1 = layers.Dropout(0.5)(conv1)

    conv2 = layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same')(conv1)
    conv2 = layers.BatchNormalization()(conv2)
    conv2 = layers.ReLU()(conv2)
    conv2 = layers.Dropout(0.5)(conv2)

    conv3 = layers.Conv2D(256, (5, 5), strides=(2, 2), padding='same')(conv2)
    conv3 = layers.BatchNormalization()(conv3)
    conv3 = layers.ReLU()(conv3)
    conv3 = layers.Dropout(0.5)(conv3)

    conv4 = layers.Conv2D(512, (5, 5), strides=(2, 2), padding='same')(conv3)
    conv4 = layers.BatchNormalization()(conv4)
    conv4 = layers.ReLU()(conv4)

    conv5 = layers.Conv2D(1024, (5, 5), strides=(2, 2), padding='same')(conv4)
    conv5 = layers.BatchNormalization()(conv5)
    conv5 = layers.ReLU()(conv5)

    # Decoder
    up6 = layers.Conv2DTranspose(512, (5, 5), strides=(2, 2), padding='same')(conv5)
    up6 = layers.BatchNormalization()(up6)
    up6 = layers.ReLU()(up6)
    up6 = layers.Concatenate()([up6, conv4])

    up7 = layers.Conv2DTranspose(256, (5, 5), strides=(2, 2), padding='same')(up6)
    up7 = layers.BatchNormalization()(up7)
    up7 = layers.ReLU()(up7)
    up7 = layers.Concatenate()([up7, conv3])

    up8 = layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same')(up7)
    up8 = layers.BatchNormalization()(up8)
    up8 = layers.ReLU()(up8)
    up8 = layers.Concatenate()([up8, conv2])

    up9 = layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same')(up8)
    up9 = layers.BatchNormalization()(up9)
    up9 = layers.ReLU()(up9)
    up9 = layers.Concatenate()([up9, conv1])

    outputs = layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', activation='sigmoid')(up9)

    model = tf.keras.Model(inputs, outputs)
    return model


""" # U-Net Modell
def build_unet(input_shape=(512, 128, 1)):
    inputs = layers.Input(shape=input_shape)
    #inputs = layers.Input(shape=X_train.shape[1:])

    # Encoder
    enc1 = conv_block(inputs, 64, dropout=True)
    enc2 = conv_block(enc1, 128, dropout=True)
    enc3 = conv_block(enc2, 256, dropout=True)
    enc4 = conv_block(enc3, 512)
    enc5 = conv_block(enc4, 1024)
    
    # Decoder with skip connections
    dec4 = conv_transpose_block(enc5, 512)
    dec4 = layers.concatenate([dec4, enc4])
    
    dec3 = conv_transpose_block(dec4, 256)
    dec3 = layers.concatenate([dec3, enc3])
    
    dec2 = conv_transpose_block(dec3, 128)
    dec2 = layers.concatenate([dec2, enc2])
    
    dec1 = conv_transpose_block(dec2, 64)
    dec1 = layers.concatenate([dec1, enc1])
    
    # Output layer with sigmoid activation for mask
    outputs = layers.Conv2D(1, kernel_size=1, activation="sigmoid")(dec1)
    
    return models.Model(inputs, outputs)

def conv_block(inputs, filters, dropout=False):
    x = layers.Conv2D(filters, kernel_size=5, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    if dropout:
        x = layers.Dropout(0.5)(x)
    return x

def conv_transpose_block(inputs, filters):
    x = layers.Conv2DTranspose(filters, kernel_size=5, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    return x """

# Modell initialisieren
model = build_unet(input_shape=(512, 128, 1))
#model.compile(optimizer='adam', loss='mse')

""" def sdr_metric(y_true, y_pred):
    signal_power = tf.reduce_sum(y_true ** 2)
    noise_power = tf.reduce_sum((y_true - y_pred) ** 2)
    return 10 * tf.math.log(signal_power / noise_power) / tf.math.log(10.0) """


# Lernrate anpassen
from tensorflow.keras.optimizers import Adam

# Lernrate anpassen
optimizer = Adam(learning_rate=0.001)

model.compile(optimizer=optimizer, loss='mse', metrics=[sdr_metric])

# Zusammenfassung des Modells anzeigen
model.summary()


In [None]:
# Trainingsdaten vorbereiten
def create_training_data(data, stft_params):
    inputs = []
    targets = []
    
    i = 0
    for data_item in data:
        i = i + 1
        print(i, "/", data.num_mixtures, end='\r')

        mix, vocals = prepare_data(data_item, stft_params)
        inputs.append(mix)
        targets.append(vocals)
    
    # Umwandlung in Tensoren
    inputs = np.array(inputs)
    targets = np.array(targets)
    
    return inputs, targets

# Beispiel für Trainingsdaten (train_data ist deine Datensammlung)
X_train, y_train = create_training_data(train_data, stft_params)


In [None]:
def mask_loss(mixture):
    def loss(y_true, y_pred):
        batch_size = tf.shape(y_pred)[0]
        current_mixture = tf.slice(mixture, [0, 0, 0, 0], [batch_size, -1, -1, -1])
        after_mask = y_pred * current_mixture
        return tf.reduce_mean(tf.square(y_true - after_mask))
    return loss



In [None]:
def sdr_metric2(mixture):
    def metric(y_true, y_pred):
        batch_size = tf.shape(y_pred)[0]
        current_mixture = mixture[:batch_size]
        after_mask = y_pred * current_mixture
        signal_power = tf.reduce_sum(y_true ** 2)
        noise_power = tf.reduce_sum((y_true - after_mask ) ** 2)
        return 10 * tf.math.log(signal_power / noise_power) / tf.math.log(10.0)
    return metric

In [26]:
def perceptual_loss(y_true, y_pred, extractor, mixture):
    after_mask = y_pred * mixture
    features_true = extractor(y_true)
    features_pred = extractor(after_mask)
    return tf.reduce_mean(tf.square(features_true - features_pred))

# Beispiel für den STFT als Extraktor:
stft_extractor = lambda x: tf.signal.stft(x, frame_length=512, frame_step=128)
#model.compile(optimizer='adam', loss=lambda y_true, y_pred: perceptual_loss(y_true, y_pred, stft_extractor))


In [None]:

class LossPlotCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        super().__init__()
        self.batch_losses = []  # Liste für Batch-Losses

    def on_train_batch_end(self, batch, logs=None):
        # Füge den aktuellen Batch-Loss hinzu
        self.batch_losses.append(logs['loss'])

    def on_train_end(self, logs=None):
        # Plotten des Loss-Verlaufs mit logarithmischer Y-Achse
        plt.figure(figsize=(10, 6))
        plt.plot(self.batch_losses, 'b-', label='Batch Loss')
        plt.yscale('log')  # Logarithmische Skala für die Y-Achse
        plt.title('Loss-Verlauf während des Trainings (logarithmisch)')
        plt.xlabel('Batch')
        plt.ylabel('Loss (log)')
        plt.legend()
        plt.show()


In [None]:
# Lernrate anpassen
optimizer = Adam(learning_rate=0.001)

# Mischung als Tensor bereitstellen
mixture = X_train
#model.compile(optimizer='adam', loss=mask_loss(mixture), metrics=[sdr_metric2( mixture)])

model.compile(optimizer=optimizer, loss='mse', metrics=[sdr_metric])

loss_plot_callback = LossPlotCallback()

# Modelltraining
history = model.fit(X_train, y_train, batch_size=4, epochs=15, validation_split=0.1, callbacks=[loss_plot_callback])


In [None]:

# Extrahiere den Trainings- und Validierungsverlust
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)

# Plotten
plt.figure(figsize=(10, 6))
plt.plot(epochs, loss, 'bo-', label='Training Loss')
plt.plot(epochs, val_loss, 'ro-', label='Validation Loss')
plt.title('Loss-Verlauf während des Trainings')
plt.xlabel('Epoche')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Testdaten laden
test_path = "C:\\Users\\Lukas\\nussl_tutorial\\test"
test_data = data.on_the_fly(stft_params, transform=None, fg_path=test_path, num_mixtures=100)

# Beispielhafte Testdatei auswählen
song_id = 20
test_item = test_data[song_id]
train_item = train_data[song_id]

# Mix und Ziel (z. B. Vocals) extrahieren und vorverarbeiten
mix_mag, vocals_mag = prepare_data(test_item, stft_params)
#mix_mag, vocals_mag = prepare_data(train_item, stft_params)

# Modellvorhersage
predicted_mask = model.predict(tf.expand_dims(mix_mag, axis=0))
predicted_mask = np.squeeze(predicted_mask, axis=0)

# Spektrogramm des gemischten Signals
mix_signal = test_item['mix']
#mix_signal = train_item['mix']
mix_stft = mix_signal.stft(window_length=512, hop_length=128)

# Maske auf die Größe des gemischten Spektrogramms bringen
predicted_mask_resized = tf.image.resize(predicted_mask, mix_stft.shape[:2])

# Dimensionen anpassen (falls notwendig)
#predicted_mask_resized = tf.squeeze(predicted_mask_resized).numpy()

# Maske anwenden
predicted_stft = predicted_mask_resized * mix_stft

audio_vocal = nussl.AudioSignal()
audio_vocal.stft_data = predicted_stft.numpy()
audio_vocal.istft(window_length=512, hop_length=128)

item = test_data[song_id]
#item = train_data[song_id]

In [None]:
mask = nussl.AudioSignal()
mask.stft_data = predicted_mask_resized.numpy()
mask.istft(window_length=512, hop_length=128)
show_1fre(mask)

In [None]:
evaluate_model_with_sdr(model, test_data, stft_params)

In [None]:
# Orginaldaten
print('Mix:')
display(Audio(data=item['mix'].audio_data, rate=item['mix'].sample_rate))
print('Vocals:')
display(Audio(data=item['sources']['vocals'].audio_data, rate=item['mix'].sample_rate))
#print(item.keys())
show_wav(item['sources'])
show_fre(item['sources'])

In [None]:
# Audio nach Model
print('nach model:')
display(Audio(data=audio_vocal.audio_data, rate=item['mix'].sample_rate))

print('Ziel Amplitudenverlauf:')
show_1wav(data=item['sources']['vocals'])
print('Amplitudenverlauf nach Model:')
show_1wav(audio_vocal)

print('Ziel Spektogram:')
show_1fre(data=item['sources']['vocals'])
print('Spektogram nach Model:')
show_1fre(audio_vocal)

In [None]:
# Modell speichern
model.save("t.h5")
print("Modell erfolgreich gespeichert!")

In [None]:
# Modell laden
model = tf.keras.models.load_model("24_11_18_unet_4.34.h5", custom_objects={'sdr_metric': sdr_metric})
print("Modell erfolgreich geladen!")