How do you implement Model Monitoring for Image Dataset?

In my code I am generating false ground truth and invoke the model as follows:

def generate_load_and_ground_truth():
    df = pd.read_csv('validation_with_predictions.csv')
    gt_records = []
    for i, row in df.iterrows():
        suffix = uuid.uuid1().hex
        inference_id = f'{i}-{suffix}'
        data = np.array([X_test[i]])
        payload = {'instances': data}
        args = {'InferenceId': inference_id}
        out = predictor.predict(data = payload, initial_args = args)
        gt_records.append(str({
            "groundTruthData": {
                "data": str(df['label'][i]),
                "encoding": 'CSV',
            },
            "eventMetadata": {
                "eventId": str(inference_id),
            },
            "eventVersion": "0",
        }))
    upload_ground_truth(gt_records, ground_truth_upload_path, datetime.utcnow())


def upload_ground_truth(records, path, upload_time):
    data_to_upload = ",".join(records)
    data_to_upload = data_to_upload
    target_s3_uri = f"{path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

When the model schedule ran it gave the error message

'MonitoringExecutionStatus': 'Failed',
'FailureReason': 'Algorithm Error: See Job Logs for more information.'

Looking into the cloud watch log I found the error to be:
'Cannot resolve column name "groundTruthMetadata.eventId" among (_corrupt_record);'

Then I used this pre-process function at my endpoint.

import json
def preprocess_handler(inference_record):
    input_dict = json.loads(inference_record.endpoint_input.data)
    output_dict = json.loads(inference_record.endpoint_output.data)
    input_data = str(input_dict['instances'].reshape(3072))[1:-1]
    output_data = str(np.argmax(output_dict['predictions'][0]))
    return_dict = {'prediction000':output_data, 'feature000':input_data}
    return return_dict

It gives the error:
'FailureReason': 'InternalServerError: We encountered an internal error. Please try again.'}

Sample data in inference_record is:

{
	"captureData": {
		"endpointInput": {
			"observedContentType": "application/json",
			"mode": "INPUT",
			"data": data, 
#"{'instances': [[[[0.6196078658103943, 0.43921568989753723, 0.1921568661928177]]]]}",  
# SAMPLE OF data_lst VERSION; dict with 4 dimensional array
			"encoding": "JSON"
		},
		"endpointOutput": {
			"observedContentType": "application/json",
			"mode": "OUTPUT",
			"data": "{\n    \"predictions\": [[0.000721988094, 0.000489010592, 0.0307604838, 0.291437089, 0.0597994663, 0.462541133, 0.110468164, 0.041162733, 0.00192647439, 0.000693516282]\n    ]\n}",
			"encoding": "JSON"
		}
	},
	"eventMetadata": {
		"eventId": "eb71956e-9d99-4bfe-a35f-6d5a33c7e701",
		"inferenceId": "155-a02fa0aaf86211ec9a027f52933c247f",
		"inferenceTime": "2022-06-30T10:51:41Z"
	},
	"eventVersion": "0"
}

@Shradaya could you please share a bit more information on how you have deployed your model, which model, which version, are you using a pipeline or a custom inference.py

I am using a custom model, created using tensorflow.

with tf.device(DEVICE):
        # Data Augmentation
        TRAIN_BATCH_SIZE = 32
        data_generator = ImageDataGenerator(width_shift_range=0.1, height_shift_range=0.1, horizontal_flip=True)
        train_iterator = data_generator.flow(X_train, y_train, batch_size=TRAIN_BATCH_SIZE)
        
        # Define Model Architecture
        model = Sequential()
        
        # CONVOLUTIONAL LAYER 1
        model.add(Conv2D(filters=16, kernel_size=2, padding='same', activation='relu', input_shape=(32, 32, 3)))
        model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=2))

        # CONVOLUTIONAL LAYER 1
        model.add(Conv2D(filters=32, kernel_size=2, padding='same', activation='relu'))
        model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=2))

        # CONVOLUTIONAL LAYER 3
        model.add(Conv2D(filters=64, kernel_size=2, padding='same', activation='relu'))
        model.add(BatchNormalization())
        model.add(MaxPooling2D(pool_size=2))
        model.add(Dropout(0.3))

        # FULLY CONNECTED LAYER 
        model.add(Flatten())
        model.add(Dense(500, activation='relu'))
        model.add(Dropout(0.4))
        model.add(Dense(10, activation='softmax'))
        model.summary()
        
        # Compile Model
        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
        
        # Train Model
        BATCH_SIZE = 32
        STEPS_PER_EPOCH = int(X_train.shape[0]/TRAIN_BATCH_SIZE)
        
        model.fit(train_iterator, 
                  steps_per_epoch=STEPS_PER_EPOCH, 
                  batch_size=BATCH_SIZE, 
                  epochs=epochs, 
                  validation_data=(X_validation, y_validation), 
                  callbacks=[], 
                  verbose=2, 
                  shuffle=True)
        
        # Evaluate on Test Set
        result = model.evaluate(X_test, y_test, verbose=1)
        logger.info(f'Test Accuracy: {result[1]}')
        loss = 1-int(result[1])
        logger.info(f'loss : {loss}')

I am deploying the model using Sagemaker python SDK as

endpoint_name=f'tensorflow-cv-{int(time.time())}'
predictor = model.deploy(initial_instance_count=1,   
                       instance_type='ml.m5.xlarge',
                       endpoint_name=endpoint_name,
 data_capture_config = data_capture_configuration)
print(f"\nSuccessfully deployed at {endpoint_name}...")

I am not using a pipeline or a inference.py file. I am just using a record_preprocessor_script during the creation of monitoring schedule

my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    **record_preprocessor_script=preprocessor_s3_dest,**
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

The pre-processing script is as follows:

import json
def preprocess_handler(inference_record):
    input_dict = json.loads(inference_record.endpoint_input.data)
    output_dict = json.loads(inference_record.endpoint_output.data)
    input_data = str(input_dict['instances'].reshape(3072))[1:-1]
    output_data = str(np.argmax(output_dict['predictions'][0]))
    return_dict = {'prediction000':output_data, 'feature000':input_data}
    return return_dict

This doesn’t look like a related issue to Hugging Face or Transformers. And I am not in expert in TensorFlow on SageMaker. It might make more sense to open an issue in the SageMaker github repository: GitHub - aws/sagemaker-python-sdk: A library for training and deploying machine learning models on Amazon SageMaker

Thank you for looking into the issue. :smile:

1 Like