Note: This work took place in May-Aug of 2022. It just took me this long to finally finish writing this (Too busy playing with my SRD 😅)
Last year I found several vulnerabilities in XNU source code using AI. My actual stated goal was to better understand NLUs, but I ended up with a very nice double win! I had started working at an AI startup (Moveworks.com - it's pretty awesome! [I'm obviously not biased 😉]) and wanted to have a better understanding of how this all worked. And there is no better way to learn anything than doing the work yourself to not only understand the how, but more importantly the why.
While understanding how NLUs worked was my main goal, I also wanted to gain insight and provide data for the following questions:
- Can I understand NLPs & NLUs well enough to not look like a complete idiot at work?
- How good is AI at finding bugs?
- How does it compare to joern, codeql, ripgrep, and grep?
- How likely am I to find bugs in well audited open source code such as XNU
Understanding NLPs & NLUs
Of course, in my use case there is less to structure than actual natural language, as the C/C++ programming language is somewhat restrictive compared to how we communicate human to human.
Stage 1: Fight! (with OpenAI)
import os
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
response = openai.Completion.create(
model="text-davinci-002",
prompt="Is there a vulnerability in this code? If so write the line of vulnerable code out in your response and tell me why it's vulnerable\n\nCODESNIPPETHERE",
temperature=0.7,
max_tokens=1500,
top_p=1,
frequency_penalty=0,
presence_penalty=0)
import os
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
response = openai.Completion.create(
model="text-davinci-002",
prompt="Is there a vulnerability in this code? If so write the line of vulnerable code out in your response and tell me why it's vulnerable\n\nint\nspec_kqfilter(vnode_t vp, struct knote *kn, struct kevent_qos_s *kev)\n{\n\tdev_t dev;\n\tassert(vnode_ischr(vp));\n\tdev = vnode_specrdev(vp);\n\n#if NETWORKING\n\t/*\n\t * Try a bpf device, as defined in bsd/net/bpf.c\n\t * If it doesn't error out the attach, then it\n\t * claimed it. Otherwise, fall through and try\n\t * other attaches.\n\t */\n\tint32_t tmp_flags = kn->kn_flags;\n\tint64_t tmp_sdata = kn->kn_sdata;\n\tint res;\n\tres = bpfkqfilter(dev, kn);\n\tif ((kn->kn_flags & EV_ERROR) == 0) {\n\t\treturn res;\n\t}\n\tkn->kn_flags = tmp_flags;\n\tkn->kn_sdata = tmp_sdata;\n#endif\n\tif (major(dev) > nchrdev) {\n\t\tknote_set_error(kn, ENXIO);\n\t\treturn 0;\n\t}\n\tkn->kn_vnode_kqok = !!(cdevsw_flags[major(dev)] & CDEVSW_SELECT_KQUEUE);\n\tkn->kn_vnode_use_ofst = !!(cdevsw_flags[major(dev)] & CDEVSW_USE_OFFSET);\n\tif (cdevsw_flags[major(dev)] & CDEVSW_IS_PTS) {\n\t\tkn->kn_filtid = EVFILTID_PTSD;\n\t\treturn ptsd_kqfilter(dev, kn);\n\t} else if (cdevsw_flags[major(dev)] & CDEVSW_IS_PTC) {\n\t\tkn->kn_filtid = EVFILTID_PTMX;\n\t\treturn ptmx_kqfilter(dev, kn);\n\t} else if (cdevsw[major(dev)].d_type == D_TTY && kn->kn_vnode_kqok) {\n\t\t/*\n\t\t * TTYs from drivers that use struct ttys use their own filter\n\t\t * routines. The PTC driver doesn't use the tty for character\n\t\t * counts, so it must go through the select fallback.\n\t\t */\n\t\tkn->kn_filtid = EVFILTID_TTY;\n\t\treturn knote_fops(kn)->f_attach(kn, kev);\n\t}\n\t/* Try to attach to other char special devices */\n\treturn filt_specattach(kn, kev);\n}",
temperature=0.7,
max_tokens=1500,
top_p=1,
frequency_penalty=0,
presence_penalty=0)
ptsd_open
was the vulnerable function, more info can be found in the jailbreak wiki in the 'Write-up by p0sixninja' section). The TL;DR - Apple didn't properly perform the check to make sure the device passed in would be in the proper minor range, resulting in code execution. Equipped with this knowledge (After a few hours googling 😅 to refresh my memory as to why my gut started screaming at me that this was on to something), I found the above article and it all clicked into place. This led me to running through the split functions for about 8 hrs, and it almost landed on what was actually wrong:
major(dev) >= nchrdev
instead of major(dev) > nchrdev
. Now that I had some results, it was time to step up my game: How do I do this locally without relying solely on
text-davinci-002
?OpenAI? We have OpenAI at home
#!/usr/bin/python3
from transformers import RobertaTokenizer, RobertaForMaskedLM, pipeline
model = RobertaForMaskedLM.from_pretrained('neulab/codebert-cpp')
tokenizer = RobertaTokenizer.from_pretrained('neulab/codebert-cpp')
code_example ="""
if (major(dev) <mask> nchrdev) {
knote_set_error(kn, ENXIO);
return 0;
}
"""
code_ref="""
if (major(dev) > nchrdev) {
knote_set_error(kn, ENXIO);
return 0;
}
"""
fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer)
outputs = fill_mask(code_example,top_k=5)
for output in outputs:
if (output['sequence'] != code_ref):
print(output)
{'score': 0.7711489200592041, 'token': 49333, 'token_str': '!=', 'sequence': '\n if (major(dev)!= nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
{'score': 0.11615779250860214, 'token': 28696, 'token_str': ' <', 'sequence': '\n if (major(dev) < nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
{'score': 0.043150343000888824, 'token': 49095, 'token_str': ' >=', 'sequence': '\n if (major(dev) >= nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
{'score': 0.030456306412816048, 'token': 45994, 'token_str': ' ==', 'sequence': '\n if (major(dev) == nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
Fine, I'll do it myself!
#!/usr/bin/python3.9
#The MAJORITY of this code is from the neulab code-bert-score repo
from transformers import AutoTokenizer, AutoModelForMaskedLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling
from datasets import load_dataset
import numpy as np
import evaluate
import torch
def compute_metrics(eval_preds):
preds, labels = eval_preds
# preds have the same shape as the labels, after the argmax(-1) has been calculated
# by preprocess_logits_for_metrics
labels = labels.reshape(-1)
preds = preds.reshape(-1)
mask = labels != -100
labels = labels[mask]
preds = preds[mask]
return metric.compute(predictions=preds, references=labels)
def preprocess_logits_for_metrics(logits, labels):
if isinstance(logits, tuple):
# Depending on the model and config, logits may contain extra tensors,
# like past_key_values, but logits always come first
logits = logits[0]
return logits.argmax(dim=-1)
def tokenize_function(examples):
examples["code"] = [line for line in examples["code"] if len(line) > 0 and not line.isspace()]
return tokenizer(examples["code"], padding="max_length", truncation=True, max_length=512,return_special_tokens_mask=True)
tokenizer = AutoTokenizer.from_pretrained("neulab/codebert-c")
training_args = TrainingArguments(output_dir="test_trainer", evaluation_strategy="epoch")
model = AutoModelForMaskedLM.from_pretrained("neulab/codebert-c")
model.resize_token_embeddings(len(tokenizer))
device = torch.device("cuda")
model.to(device)
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=True,
mlm_probability=.15,
)
training_args = TrainingArguments(output_dir="test_trainer", evaluation_strategy="epoch", max_steps=100000)
#w/o streaming you need much larger than 1 TB of space for all the data
#Likely some bias introduced due to validation + training data overlapping
train_dataset = load_dataset("codeparrot/github-code-clean", streaming=True, split='train', languages=['C','C++'])
with training_args.main_process_first(desc="dataset map tokenization"):
token_train_dataset = train_dataset.map(
function=tokenize_function,
batched=True,
remove_columns="code",
)
#need 2 of these since IterableDataset doesn't support train_test_split - at least not yet!
#Likely some bias introduced due to validation + training data overlapping
eval_dataset = load_dataset("codeparrot/github-code-clean", streaming=True, split='train', languages=['C','C++'])
with training_args.main_process_first(desc="dataset map tokenization"):
token_eval_dataset = eval_dataset.map(
tokenize_function,
batched=True,
remove_columns="code",
)
metric = evaluate.load("accuracy")
trainer = Trainer(
model=model,
args=training_args,
train_dataset=token_train_dataset,
tokenizer=tokenizer,
data_collator=data_collator,
eval_dataset=token_eval_dataset,
compute_metrics=compute_metrics,
preprocess_logits_for_metrics=preprocess_logits_for_metrics
)
#insert checkpoints here if you want to use checkpoints
trainer.train()
trainer.save_model("test_trainer\newModel")
This ran for ~9 days straight on a 3090 at pretty close to max capacity the entire time,
Accurate representation of how I felt as my room 'warmed' up |
but it finally finished! Thankfully, (And much to my families joy), I didn't have to run it again. Now that we built it, let's try the new model!
#!/usr/bin/python3
from transformers import RobertaTokenizer, RobertaForMaskedLM, pipeline
model = RobertaForMaskedLM.from_pretrained('test_trainer/newModel')
tokenizer = RobertaTokenizer.from_pretrained('test_trainer/newModel')
code_example ="""
if (major(dev) <mask> nchrdev) {
knote_set_error(kn, ENXIO);
return 0;
}
"""
code_ref="""
if (major(dev) > nchrdev) {
knote_set_error(kn, ENXIO);
return 0;
}
"""
fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer)
outputs = fill_mask(code_example,top_k=5)
for output in outputs:
if (output['sequence'] != code_ref):
print(output)
Which now gives us:
{'score': 0.6952677965164185, 'token': 49333, 'token_str': '!=', 'sequence': '\n if (major(dev)!= nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
{'score': 0.08862753957509995, 'token': 49095, 'token_str': ' >=', 'sequence': '\n if (major(dev) >= nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
{'score': 0.07690384238958359, 'token': 28696, 'token_str': ' <', 'sequence': '\n if (major(dev) < nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
{'score': 0.016858655959367752, 'token': 45994, 'token_str': ' ==', 'sequence': '\n if (major(dev) == nchrdev) {\n knote_set_error(kn, ENXIO);\n return 0;\n }\n'}
But does it work?
Of course, there was a problem: A lot of the output I was getting looked this:
'', '{', '(', '_']
'', 'if', '>', '(']
Stuck? Just give it another shot 😀 |
#!/usr/bin/python3
from transformers import RobertaTokenizer, RobertaForMaskedLM, pipeline
import torch
import random
#numMasksToInsert=random.randrange(0,25)
numMasksToInsert=11
model = RobertaForMaskedLM.from_pretrained('test_trainer/newModel')
tokenizer = RobertaTokenizer.from_pretrained('
test_trainer/newModel')
maskStringConstant=""
maskReplacementString="MASKREPLACEME"
#based on code from https://ramsrigoutham.medium.com/sized-fill-in-the-blank-or-multi-mask-filling-with-roberta-and-huggingface-transformers-58eb9e7fb0c
def get_prediction (sent):
token_ids = tokenizer.encode(sent, return_tensors='pt')
masked_position = (token_ids.squeeze() == tokenizer.mask_token_id).nonzero()
masked_pos = [mask.item() for mask in masked_position ]
with torch.no_grad():
output = model(token_ids)
last_hidden_state = output[0].squeeze()
list_of_list =[]
for index,mask_index in enumerate(masked_pos):
mask_hidden_state = last_hidden_state[mask_index]
idx = torch.topk(mask_hidden_state, k=1, dim=0)[1]
words = [tokenizer.decode(i.item()).strip() for i in idx]
list_of_list.append(words)
#print ("Mask ",index+1,"Guesses : ",words)
best_guess = ""
for j in list_of_list:
best_guess = best_guess+" "+j[0]
return best_guess
code_example ="""
static int
mt_cdev_open(dev_t devnum, __unused int flags, __unused int devtype,
__unused proc_t p)
{
int error = 0;
MASKREPLACEME
mt_device_t dev = mt_get_device(devnum);
mt_device_lock(dev);
if (dev->mtd_inuse) {
error = EBUSY;
} else {
dev->mtd_inuse = true;
}
mt_device_unlock(dev);
return error;
}
"""
newCode=code_example.replace(maskReplacementString,maskStringConstant*numMasksToInsert)
predicted_mask = get_prediction(newCode)
predicted_maskList = predicted_mask.split(" ")
print("predicted_maskList is %s" %(predicted_maskList))
newCode=newCode.replace(maskStringConstant,predicted_mask,1)
if "if" in predicted_mask and "(" in predicted_mask and ")" in predicted_mask and "{" in predicted_mask:
#fix up if statement if we find one, include a few masks in the event the AI gives us "blanks"
#TODO: Are these actual masks or am I smokin something?
updateCodeSnippet=predicted_mask.replace("_",maskStringConstant).replace(" ","")
#Use our original code to insert the newly updated snippet into
newerCode=code_example.replace(maskReplacementString,updateCodeSnippet)
second_predicted_mask=get_prediction(newerCode)
second_predicted_maskList = second_predicted_mask.split(" ")
print("predicted_maskList is %s" %(second_predicted_maskList))
newestCode=newerCode.replace(maskStringConstant,second_predicted_mask,1)
#TODO: Probably shouldn't do this?
#any masks leftover? Ignore for easier grepping
newestCode=newestCode.replace(maskStringConstant,"")
print("GREPFORME: ",newestCode)
else:
print("no if statement, only perform one round of predicting")
print("GREPFORME: ",newCode)
predicted_maskList is ['', '', '', 'if', '(', '_', '_', '_', '_', ')', '', '{']
predicted_maskList is ['', 'dev', 'num', '==', '0']
GREPFORME:
static int
mt_cdev_open(dev_t devnum, __unused int flags, __unused int devtype,
__unused proc_t p)
{
int error = 0;
if( dev num == 0){
mt_device_t dev = mt_get_device(devnum);
mt_device_lock(dev);
if (dev->mtd_inuse) {
error = EBUSY;
} else {
dev->mtd_inuse = true;
}
mt_device_unlock(dev);
return error;
}
mt_get_device
I confirmed it was indeed vulnerable. This became CVE-2022-32944, and was the most substantial bug to fall out from this adventure.